[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898801
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogSuite.scala
 ---
@@ -860,6 +864,24 @@ abstract class CatalogTestUtils {
   bucketSpec = Some(BucketSpec(4, Seq("col1"), Nil)))
   }
 
+  def newView(name: String, database: Option[String] = None): CatalogTable 
= {
+CatalogTable(
+  identifier = TableIdentifier(name, database),
+  tableType = CatalogTableType.VIEW,
+  storage = CatalogStorageFormat.empty,
+  schema = new StructType()
+.add("col1", "int")
+.add("col2", "string")
+.add("a", "int")
+.add("b", "string"),
+  provider = Some("hive"),
--- End diff --

Do we need a case that the provider is not set?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898653
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -551,17 +551,26 @@ class SessionCatalog(
*
* If a database is specified in `name`, this will return the table/view 
from that database.
* If no database is specified, this will first attempt to return a 
temporary table/view with
-   * the same name, then, if that does not exist, return the table/view 
from the current database.
+   * the same name, then, if that does not exist, and defaultDatabase is 
defined, return the
+   * table/view from the defaultDatabase, else return the table/view from 
the catalog.currentDb.
*
* Note that, the global temp view database is also valid here, this 
will return the global temp
* view matching the given name.
*
-   * If the relation is a view, the relation will be wrapped in a 
[[SubqueryAlias]] which will
-   * track the name of the view.
+   * If the relation is a view, we generate a [[View]] operator from the 
view description, and
+   * wrap the logical plan in a [[SubqueryAlias]] which will track the 
name of the view.
+   *
+   * @param name The name of the table/view that we lookup.
+   * @param alias The alias name of the table/view that we lookup.
+   * @param defaultDatabase The database name we should use to lookup the 
table/view, if the
+   *database part of [[TableIdentifier]] is not 
defined.
--- End diff --

Let's also explain the precedence (db name in the table identifier -> 
default db -> current db). 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898562
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -658,6 +720,17 @@ class Analyzer(
   Generate(newG.asInstanceOf[Generator], join, outer, qualifier, 
output, child)
 }
 
+  // A special case for View, replace the output attributes with the 
attributes that have the
+  // same names from the child. If the corresponding attribute is not 
found, throw an
+  // AnalysisException.
+  // TODO: Also check the dataTypes and nullabilites of the output.
--- End diff --

Can we also explain why we need to replace output attributes in the code 
comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898349
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,94 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
+// and change the default database name if it is a view.
+//
+// Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
+// have empty defaultDatabase and all the relations in viewText have 
database part defined.
+def resolveRelation(
+plan: LogicalPlan,
+defaultDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, defaultDatabase))
+}
+
+// Look up the table with the given name from catalog. If 
`defaultDatabase` is set, we look up
+// the table in the database `defaultDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
+u: UnresolvedRelation,
+defaultDatabase: Option[String] = None): LogicalPlan = {
   try {
-catalog.lookupRelation(u.tableIdentifier, u.alias)
+catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase)
   } catch {
 case _: NoSuchTableException =>
   u.failAnalysis(s"Table or view not found: ${u.tableName}")
   }
 }
 
-def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
-i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-  case u: UnresolvedRelation =>
-val table = u.tableIdentifier
-if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
-(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-  // If the database part is specified, and we support running SQL 
directly on files, and
-  // it's not a temporary view, and the table does not exist, then 
let's just return the
-  // original UnresolvedRelation. It is possible we are matching a 
query like "select *
-  // from parquet.`/path/to/query`". The plan will get resolved 
later.
-  // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
-  // an exception from tableExists if the database does not exist.
-  u
-} else {
-  lookupTableFromCatalog(u)
+// If the database part is specified, and we support running SQL 
directly on files, and
+// it's not a temporary view, and the table does not exist, then let's 
just return the
+// original UnresolvedRelation. It is possible we are matching a query 
like "select *
+// from parquet.`/path/to/query`". The plan will get resolved later.
+// Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
+// an exception from tableExists if the database does not exist.
+private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
+  table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
+(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
+}
+
+// Change the default database name if the plan is a view, and 
transformDown with the new
+// database name to resolve all UnresolvedRelations and Views.
+// If the view is defined in a DataSource other than Hive, and the 
view's child is empty,
+// set the view's child to a SimpleCatalogRelation, else throw an 
AnalysisException.
+def resolveView(plan: LogicalPlan): LogicalPlan = plan match

[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898300
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,94 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
+// and change the default database name if it is a view.
+//
+// Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
+// have empty defaultDatabase and all the relations in viewText have 
database part defined.
+def resolveRelation(
+plan: LogicalPlan,
+defaultDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, defaultDatabase))
+}
+
+// Look up the table with the given name from catalog. If 
`defaultDatabase` is set, we look up
+// the table in the database `defaultDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
+u: UnresolvedRelation,
+defaultDatabase: Option[String] = None): LogicalPlan = {
   try {
-catalog.lookupRelation(u.tableIdentifier, u.alias)
+catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase)
   } catch {
 case _: NoSuchTableException =>
   u.failAnalysis(s"Table or view not found: ${u.tableName}")
   }
 }
 
-def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
-i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-  case u: UnresolvedRelation =>
-val table = u.tableIdentifier
-if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
-(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-  // If the database part is specified, and we support running SQL 
directly on files, and
-  // it's not a temporary view, and the table does not exist, then 
let's just return the
-  // original UnresolvedRelation. It is possible we are matching a 
query like "select *
-  // from parquet.`/path/to/query`". The plan will get resolved 
later.
-  // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
-  // an exception from tableExists if the database does not exist.
-  u
-} else {
-  lookupTableFromCatalog(u)
+// If the database part is specified, and we support running SQL 
directly on files, and
+// it's not a temporary view, and the table does not exist, then let's 
just return the
+// original UnresolvedRelation. It is possible we are matching a query 
like "select *
+// from parquet.`/path/to/query`". The plan will get resolved later.
+// Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
+// an exception from tableExists if the database does not exist.
+private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
+  table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
+(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
+}
+
+// Change the default database name if the plan is a view, and 
transformDown with the new
+// database name to resolve all UnresolvedRelations and Views.
+// If the view is defined in a DataSource other than Hive, and the 
view's child is empty,
+// set the view's child to a SimpleCatalogRelation, else throw an 
AnalysisException.
+def resolveView(plan: LogicalPlan): LogicalPlan = plan match

[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898210
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,94 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
+// and change the default database name if it is a view.
+//
+// Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
+// have empty defaultDatabase and all the relations in viewText have 
database part defined.
+def resolveRelation(
+plan: LogicalPlan,
+defaultDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, defaultDatabase))
+}
+
+// Look up the table with the given name from catalog. If 
`defaultDatabase` is set, we look up
+// the table in the database `defaultDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
+u: UnresolvedRelation,
+defaultDatabase: Option[String] = None): LogicalPlan = {
   try {
-catalog.lookupRelation(u.tableIdentifier, u.alias)
+catalog.lookupRelation(u.tableIdentifier, u.alias, defaultDatabase)
   } catch {
 case _: NoSuchTableException =>
   u.failAnalysis(s"Table or view not found: ${u.tableName}")
   }
 }
 
-def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
-i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-  case u: UnresolvedRelation =>
-val table = u.tableIdentifier
-if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
-(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-  // If the database part is specified, and we support running SQL 
directly on files, and
-  // it's not a temporary view, and the table does not exist, then 
let's just return the
-  // original UnresolvedRelation. It is possible we are matching a 
query like "select *
-  // from parquet.`/path/to/query`". The plan will get resolved 
later.
-  // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
-  // an exception from tableExists if the database does not exist.
-  u
-} else {
-  lookupTableFromCatalog(u)
+// If the database part is specified, and we support running SQL 
directly on files, and
+// it's not a temporary view, and the table does not exist, then let's 
just return the
+// original UnresolvedRelation. It is possible we are matching a query 
like "select *
+// from parquet.`/path/to/query`". The plan will get resolved later.
+// Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
+// an exception from tableExists if the database does not exist.
+private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
+  table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
+(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
+}
+
+// Change the default database name if the plan is a view, and 
transformDown with the new
+// database name to resolve all UnresolvedRelations and Views.
+// If the view is defined in a DataSource other than Hive, and the 
view's child is empty,
+// set the view's child to a SimpleCatalogRelation, else throw an 
AnalysisException.
--- End diff --

```
// If the view is defined in a DataSource other than

[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898130
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,94 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
+// and change the default database name if it is a view.
+//
+// Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
+// have empty defaultDatabase and all the relations in viewText have 
database part defined.
+def resolveRelation(
+plan: LogicalPlan,
+defaultDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, defaultDatabase))
+}
+
+// Look up the table with the given name from catalog. If 
`defaultDatabase` is set, we look up
+// the table in the database `defaultDatabase`, else we follow the 
default way.
--- End diff --

Let's be specific on what "else we follow the default way" means at here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93898018
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,94 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we look 
up the table from catalog
+// and change the default database name if it is a view.
+//
+// Note this is compatible with the views defined by older versions of 
Spark(before 2.2), which
+// have empty defaultDatabase and all the relations in viewText have 
database part defined.
+def resolveRelation(
+plan: LogicalPlan,
+defaultDatabase: Option[String] = None): LogicalPlan = plan match {
--- End diff --

Let's explain defaultDatabase more, like when we need it, when it is set, 
and how it is different from the current database in the session catalog.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16388: [SPARK-18989][SQL] DESC TABLE should not fail wit...

2016-12-26 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16388#discussion_r93897706
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
---
@@ -408,8 +408,15 @@ private[hive] class HiveClientImpl(
 lastAccessTime = h.getLastAccessTime.toLong * 1000,
 storage = CatalogStorageFormat(
   locationUri = shim.getDataLocation(h),
-  inputFormat = Option(h.getInputFormatClass).map(_.getName),
-  outputFormat = Option(h.getOutputFormatClass).map(_.getName),
+  // To avoid ClassNotFound exception, we try our best to not get 
the format class, but get
+  // the class name directly. However, for non-native tables, 
there is no interface to get
+  // the format class name, so we may still throw ClassNotFound in 
this case.
+  inputFormat = Option(h.getTTable.getSd.getInputFormat).orElse {
+Option(h.getStorageHandler).map(_.getInputFormatClass.getName)
--- End diff --

Is it actually also fixed a bug? Is it possible to have a test?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16233: [SPARK-18801][SQL] Add `View` operator to help resolve a...

2016-12-26 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16233
  
> We can make ResolveRelations View aware, and make it keep track of the 
default databases (plural - in case of nested views). The default database will 
be the one of the last seen parent view. This approach makes is trivial to 
limit the depth of nested views (which might be needed at some point), or we 
can make this only resolve one layer of nested views at a time and use the 
analyzer's maxIterations as an implicit limit.

Yea. This seems the easiest way to achieve what we need. I am good with 
this approach.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelec...

2016-12-22 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15996
  
ah 
https://github.com/apache/spark/commit/9a1ad71db44558bb6eb380dc23a1a1abbc2f3e98 
failed. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTableAsSelec...

2016-12-22 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15996
  
LGTM. Can you update the comment to address my last comment 
(https://github.com/apache/spark/pull/15996#discussion_r93730700)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93730700
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 ---
@@ -643,6 +644,14 @@ class DataFrameReaderWriterSuite extends QueryTest 
with SharedSQLContext with Be
 withTable("t") {
   val provider = "org.apache.spark.sql.test.DefaultSource"
   sql(s"CREATE TABLE t USING $provider")
+
+  // make sure the data source doesn't provide `InsertableRelation`, 
so that we can only append
+  // data to it with `CreatableRelationProvider.createRelation`
--- End diff --

One last comment. Let's explicitly say that we want to test the case that a 
data source is a CreatableRelationProvider but its relation does not implement 
InsertableRelation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720714
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite
   withTempDir { dir =>
 setupPartitionedDatasourceTable("test", dir)
 if (enabled) {
-  spark.sql("msck repair table test")
+  assert(spark.table("test").count() == 0)
+} else {
+  assert(spark.table("test").count() == 5)
 }
-assert(spark.sql("select * from test").count() == 5)
-spark.range(10).selectExpr("id as fieldOne", "id as partCol")
+
+spark.range(3, 13).selectExpr("id as fieldOne", "id as 
partCol")
--- End diff --

It will be good to also explain the reason that we use (3, 13) in the 
comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720687
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite
   withTempDir { dir =>
 setupPartitionedDatasourceTable("test", dir)
 if (enabled) {
-  spark.sql("msck repair table test")
+  assert(spark.table("test").count() == 0)
+} else {
+  assert(spark.table("test").count() == 5)
 }
-assert(spark.sql("select * from test").count() == 5)
-spark.range(10).selectExpr("id as fieldOne", "id as partCol")
+
+spark.range(3, 13).selectExpr("id as fieldOne", "id as 
partCol")
   
.write.partitionBy("partCol").mode("append").saveAsTable("test")
-assert(spark.sql("select * from test").count() == 15)
+
+if (enabled) {
+  // Only the newly written partitions are visible, which 
means the partitions
--- End diff --

Let's also explain why we only see newly written partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720630
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite
   withTempDir { dir =>
 setupPartitionedDatasourceTable("test", dir)
 if (enabled) {
-  spark.sql("msck repair table test")
--- End diff --

oh, we are checking the number of rows before the msck, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720613
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/PartitionProviderCompatibilitySuite.scala
 ---
@@ -195,12 +195,25 @@ class PartitionProviderCompatibilitySuite
   withTempDir { dir =>
 setupPartitionedDatasourceTable("test", dir)
 if (enabled) {
-  spark.sql("msck repair table test")
--- End diff --

Why we do not need this anymore?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720521
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/command/createDataSourceTables.scala
 ---
@@ -140,153 +140,55 @@ case class CreateDataSourceTableAsSelectCommand(
 val tableIdentWithDB = table.identifier.copy(database = Some(db))
 val tableName = tableIdentWithDB.unquotedString
 
-var createMetastoreTable = false
-// We may need to reorder the columns of the query to match the 
existing table.
-var reorderedColumns = Option.empty[Seq[NamedExpression]]
 if (sessionState.catalog.tableExists(tableIdentWithDB)) {
-  // Check if we need to throw an exception or just return.
-  mode match {
-case SaveMode.ErrorIfExists =>
-  throw new AnalysisException(s"Table $tableName already exists. " 
+
-s"If you are using saveAsTable, you can set SaveMode to 
SaveMode.Append to " +
-s"insert data into the table or set SaveMode to 
SaveMode.Overwrite to overwrite" +
-s"the existing data. " +
-s"Or, if you are using SQL CREATE TABLE, you need to drop 
$tableName first.")
-case SaveMode.Ignore =>
-  // Since the table already exists and the save mode is Ignore, 
we will just return.
-  return Seq.empty[Row]
-case SaveMode.Append =>
-  val existingTable = 
sessionState.catalog.getTableMetadata(tableIdentWithDB)
-
-  if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
-throw new AnalysisException(s"Saving data in the Hive serde 
table $tableName is " +
-  "not supported yet. Please use the insertInto() API as an 
alternative.")
-  }
-
-  // Check if the specified data source match the data source of 
the existing table.
-  val existingProvider = 
DataSource.lookupDataSource(existingTable.provider.get)
-  val specifiedProvider = 
DataSource.lookupDataSource(table.provider.get)
-  // TODO: Check that options from the resolved relation match the 
relation that we are
-  // inserting into (i.e. using the same compression).
-  if (existingProvider != specifiedProvider) {
-throw new AnalysisException(s"The format of the existing table 
$tableName is " +
-  s"`${existingProvider.getSimpleName}`. It doesn't match the 
specified format " +
-  s"`${specifiedProvider.getSimpleName}`.")
-  }
-
-  if (query.schema.length != existingTable.schema.length) {
-throw new AnalysisException(
-  s"The column number of the existing table $tableName" +
-s"(${existingTable.schema.catalogString}) doesn't match 
the data schema" +
-s"(${query.schema.catalogString})")
-  }
-
-  val resolver = sessionState.conf.resolver
-  val tableCols = existingTable.schema.map(_.name)
-
-  reorderedColumns = Some(existingTable.schema.map { f =>
-query.resolve(Seq(f.name), resolver).getOrElse {
-  val inputColumns = query.schema.map(_.name).mkString(", ")
-  throw new AnalysisException(
-s"cannot resolve '${f.name}' given input columns: 
[$inputColumns]")
-}
-  })
-
-  // In `AnalyzeCreateTable`, we verified the consistency between 
the user-specified table
-  // definition(partition columns, bucketing) and the SELECT 
query, here we also need to
-  // verify the the consistency between the user-specified table 
definition and the existing
-  // table definition.
-
-  // Check if the specified partition columns match the existing 
table.
-  val specifiedPartCols = CatalogUtils.normalizePartCols(
-tableName, tableCols, table.partitionColumnNames, resolver)
-  if (specifiedPartCols != existingTable.partitionColumnNames) {
-throw new AnalysisException(
-  s"""
-|Specified partitioning does not match that of the 
existing table $tableName.
-|Specified partition columns: 
[${specifiedPartCols.mkString(", ")}]
-|Existing partition columns: 
[${existingTable.partitionColumnNames.mkString(", ")}]
-  """.stripMargin)
-  }
-
-  // Check if the specified bucketing match the existing table.
-  val specifiedBucketSpec = tab

[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720313
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -363,48 +365,125 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   throw new AnalysisException("Cannot create hive serde table with 
saveAsTable API")
 }
 
-val tableExists = 
df.sparkSession.sessionState.catalog.tableExists(tableIdent)
-
-(tableExists, mode) match {
-  case (true, SaveMode.Ignore) =>
-// Do nothing
-
-  case (true, SaveMode.ErrorIfExists) =>
-throw new AnalysisException(s"Table $tableIdent already exists.")
-
-  case _ =>
-val existingTable = if (tableExists) {
-  
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
-} else {
-  None
-}
-val storage = if (tableExists) {
-  existingTable.get.storage
-} else {
-  DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-}
-val tableType = if (tableExists) {
-  existingTable.get.tableType
-} else if (storage.locationUri.isDefined) {
-  CatalogTableType.EXTERNAL
-} else {
-  CatalogTableType.MANAGED
+val catalog = df.sparkSession.sessionState.catalog
+val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+val tableIdentWithDB = tableIdent.copy(database = Some(db))
+val tableName = tableIdent.unquotedString
+
+catalog.getTableMetadataOption(tableIdent) match {
+  // If the table already exists...
+  case Some(tableMeta) =>
+mode match {
+  case SaveMode.Ignore => // Do nothing
+
+  case SaveMode.ErrorIfExists =>
+throw new AnalysisException(s"Table $tableName already exists. 
You can set SaveMode " +
+  "to SaveMode.Append to insert data into the table or set 
SaveMode to " +
+  "SaveMode.Overwrite to overwrite the existing data.")
+
+  case SaveMode.Append =>
+// Check if the specified data source match the data source of 
the existing table.
+val specifiedProvider = DataSource.lookupDataSource(source)
+// TODO: Check that options from the resolved relation match 
the relation that we are
+// inserting into (i.e. using the same compression).
+
+// Pass a table identifier with database part, so that 
`lookupRelation` won't get temp
+// views unexpectedly.
+
EliminateSubqueryAliases(catalog.lookupRelation(tableIdentWithDB)) match {
+  case l @ LogicalRelation(_: InsertableRelation | _: 
HadoopFsRelation, _, _) =>
+// check if the file formats match
+l.relation match {
+  case r: HadoopFsRelation if r.fileFormat.getClass != 
specifiedProvider =>
+throw new AnalysisException(
+  s"The file format of the existing table $tableName 
is " +
+s"`${r.fileFormat.getClass.getName}`. It doesn't 
match the specified " +
+s"format `$source`")
+  case _ =>
+}
+  case s: SimpleCatalogRelation if 
DDLUtils.isDatasourceTable(s.metadata) => // OK.
+  case c: CatalogRelation if c.catalogTable.provider == 
Some(DDLUtils.HIVE_PROVIDER) =>
+throw new AnalysisException(s"Saving data in the Hive 
serde table $tableName " +
+  s"is not supported yet. Please use the insertInto() API 
as an alternative.")
+  case o =>
+throw new AnalysisException(s"Saving data in ${o.toString} 
is not supported.")
+}
+
+val existingSchema = tableMeta.schema
+if (df.logicalPlan.schema.size != existingSchema.size) {
+  throw new AnalysisException(
+s"The column number of the existing table $tableName" +
+  s"(${existingSchema.catalogString}) doesn't match the 
data schema" +
+  s"(${df.logicalPlan.schema.catalogString})")
+}
+
+if (partitioningColumns.isDefined) {
+  logWarning("append to an existing table, the specified 
partition columns " +
+s"[${partitioningColumns.get.mkString(", ")}] will be 
ignored.")
+  

[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93720195
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -364,48 +366,162 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   throw new AnalysisException("Cannot create hive serde table with 
saveAsTable API")
 }
 
-val tableExists = 
df.sparkSession.sessionState.catalog.tableExists(tableIdent)
-
-(tableExists, mode) match {
-  case (true, SaveMode.Ignore) =>
-// Do nothing
-
-  case (true, SaveMode.ErrorIfExists) =>
-throw new AnalysisException(s"Table $tableIdent already exists.")
-
-  case _ =>
-val existingTable = if (tableExists) {
-  
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
-} else {
-  None
-}
-val storage = if (tableExists) {
-  existingTable.get.storage
-} else {
-  DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-}
-val tableType = if (tableExists) {
-  existingTable.get.tableType
-} else if (storage.locationUri.isDefined) {
-  CatalogTableType.EXTERNAL
-} else {
-  CatalogTableType.MANAGED
+val catalog = df.sparkSession.sessionState.catalog
+val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+val tableIdentWithDB = tableIdent.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+
+catalog.getTableMetadataOption(tableIdentWithDB) match {
+  // If the table already exists...
+  case Some(existingTable) =>
+mode match {
+  case SaveMode.Ignore => // Do nothing
+
+  case SaveMode.ErrorIfExists =>
+throw new AnalysisException(s"Table $tableName already exists. 
You can set SaveMode " +
+  "to SaveMode.Append to insert data into the table or set 
SaveMode to " +
+  "SaveMode.Overwrite to overwrite the existing data.")
+
+  case SaveMode.Append =>
+if (existingTable.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("Saving data into a view is not 
allowed.")
+}
+
+if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+  throw new AnalysisException(s"Saving data in the Hive serde 
table $tableName is " +
+"not supported yet. Please use the insertInto() API as an 
alternative.")
+}
+
+// Check if the specified data source match the data source of 
the existing table.
+val existingProvider = 
DataSource.lookupDataSource(existingTable.provider.get)
+val specifiedProvider = DataSource.lookupDataSource(source)
+// TODO: Check that options from the resolved relation match 
the relation that we are
+// inserting into (i.e. using the same compression).
+if (existingProvider != specifiedProvider) {
+  throw new AnalysisException(s"The format of the existing 
table $tableName is " +
+s"`${existingProvider.getSimpleName}`. It doesn't match 
the specified format " +
+s"`${specifiedProvider.getSimpleName}`.")
+}
+
+if (df.schema.length != existingTable.schema.length) {
+  throw new AnalysisException(
+s"The column number of the existing table $tableName" +
+  s"(${existingTable.schema.catalogString}) doesn't match 
the data schema" +
+  s"(${df.schema.catalogString})")
+}
+
+val resolver = df.sparkSession.sessionState.conf.resolver
+val tableCols = existingTable.schema.map(_.name)
+
+// As we are inserting into an existing table, we should 
respect the existing schema and
+// adjust the column order of the given dataframe according to 
it, or throw exception
+// if the column names do not match.
+val adjustedColumns = tableCols.map { col =>
+  df.queryExecution.analyzed.resolve(Seq(col), 
resolver).getOrElse {
+val inputColumns = df.schema.map(_.name).mkString(", ")
+throw new AnalysisException(
+  s"cannot resolve '$col' given input columns: 
[$inputColumns]")
+  }
+}
+
+   

[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93719938
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---
@@ -364,48 +366,162 @@ final class DataFrameWriter[T] private[sql](ds: 
Dataset[T]) {
   throw new AnalysisException("Cannot create hive serde table with 
saveAsTable API")
 }
 
-val tableExists = 
df.sparkSession.sessionState.catalog.tableExists(tableIdent)
-
-(tableExists, mode) match {
-  case (true, SaveMode.Ignore) =>
-// Do nothing
-
-  case (true, SaveMode.ErrorIfExists) =>
-throw new AnalysisException(s"Table $tableIdent already exists.")
-
-  case _ =>
-val existingTable = if (tableExists) {
-  
Some(df.sparkSession.sessionState.catalog.getTableMetadata(tableIdent))
-} else {
-  None
-}
-val storage = if (tableExists) {
-  existingTable.get.storage
-} else {
-  DataSource.buildStorageFormatFromOptions(extraOptions.toMap)
-}
-val tableType = if (tableExists) {
-  existingTable.get.tableType
-} else if (storage.locationUri.isDefined) {
-  CatalogTableType.EXTERNAL
-} else {
-  CatalogTableType.MANAGED
+val catalog = df.sparkSession.sessionState.catalog
+val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+val tableIdentWithDB = tableIdent.copy(database = Some(db))
+val tableName = tableIdentWithDB.unquotedString
+
+catalog.getTableMetadataOption(tableIdentWithDB) match {
+  // If the table already exists...
+  case Some(existingTable) =>
+mode match {
+  case SaveMode.Ignore => // Do nothing
+
+  case SaveMode.ErrorIfExists =>
+throw new AnalysisException(s"Table $tableName already exists. 
You can set SaveMode " +
+  "to SaveMode.Append to insert data into the table or set 
SaveMode to " +
+  "SaveMode.Overwrite to overwrite the existing data.")
+
+  case SaveMode.Append =>
+if (existingTable.tableType == CatalogTableType.VIEW) {
+  throw new AnalysisException("Saving data into a view is not 
allowed.")
+}
+
+if (existingTable.provider.get == DDLUtils.HIVE_PROVIDER) {
+  throw new AnalysisException(s"Saving data in the Hive serde 
table $tableName is " +
+"not supported yet. Please use the insertInto() API as an 
alternative.")
+}
+
+// Check if the specified data source match the data source of 
the existing table.
+val existingProvider = 
DataSource.lookupDataSource(existingTable.provider.get)
+val specifiedProvider = DataSource.lookupDataSource(source)
+// TODO: Check that options from the resolved relation match 
the relation that we are
+// inserting into (i.e. using the same compression).
+if (existingProvider != specifiedProvider) {
+  throw new AnalysisException(s"The format of the existing 
table $tableName is " +
+s"`${existingProvider.getSimpleName}`. It doesn't match 
the specified format " +
+s"`${specifiedProvider.getSimpleName}`.")
+}
+
+if (df.schema.length != existingTable.schema.length) {
+  throw new AnalysisException(
+s"The column number of the existing table $tableName" +
+  s"(${existingTable.schema.catalogString}) doesn't match 
the data schema" +
+  s"(${df.schema.catalogString})")
+}
+
+val resolver = df.sparkSession.sessionState.conf.resolver
+val tableCols = existingTable.schema.map(_.name)
+
+// As we are inserting into an existing table, we should 
respect the existing schema and
+// adjust the column order of the given dataframe according to 
it, or throw exception
+// if the column names do not match.
+val adjustedColumns = tableCols.map { col =>
+  df.queryExecution.analyzed.resolve(Seq(col), 
resolver).getOrElse {
+val inputColumns = df.schema.map(_.name).mkString(", ")
+throw new AnalysisException(
+  s"cannot resolve '$col' given input columns: 
[$inputColumns]")
+  }
+}
--- End diff --


[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93719624
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 ---
@@ -635,4 +638,13 @@ class DataFrameReaderWriterSuite extends QueryTest 
with SharedSQLContext with Be
   checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
 }
   }
+
+  test("use saveAsTable to append to a data source table implementing 
CreatableRelationProvider") {
+withTable("t") {
+  val provider = "org.apache.spark.sql.test.DefaultSource"
--- End diff --

I am thinking that we probably should create a test data source that 
explicitly mentions its relation is not an InsertableRelation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15996: [SPARK-18567][SQL] Simplify CreateDataSourceTable...

2016-12-22 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15996#discussion_r93718921
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 ---
@@ -635,4 +638,13 @@ class DataFrameReaderWriterSuite extends QueryTest 
with SharedSQLContext with Be
   checkAnswer(spark.table("t"), Row(1, "a") :: Row(2, "b") :: Nil)
 }
   }
+
+  test("use saveAsTable to append to a data source table implementing 
CreatableRelationProvider") {
+withTable("t") {
+  val provider = "org.apache.spark.sql.test.DefaultSource"
--- End diff --

Is the relation created by this guy an InsertableRelation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16233: [SPARK-18801][SQL] Add `View` operator to help resolve a...

2016-12-21 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16233
  
One general comment, let's explain how this patch maintains the 
compatibility with views defined by previous versions of Spark. It is also good 
to explain it in the corresponding part in the code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93551189
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 ---
@@ -549,17 +549,26 @@ class SessionCatalog(
*
* If a database is specified in `name`, this will return the table/view 
from that database.
* If no database is specified, this will first attempt to return a 
temporary table/view with
-   * the same name, then, if that does not exist, return the table/view 
from the current database.
+   * the same name, then, if that does not exist, and currentDatabase is 
defined, return the
+   * table/view from the currentDatabase, else return the table/view from 
the catalog.currentDb.
*
* Note that, the global temp view database is also valid here, this 
will return the global temp
* view matching the given name.
*
-   * If the relation is a view, the relation will be wrapped in a 
[[SubqueryAlias]] which will
-   * track the name of the view.
+   * If the relation is a view, we add a [[View]] operator over the 
relation, and wrap the logical
+   * plan in a [[SubqueryAlias]] which will track the name of the view.
+   *
+   * @param name The name of the table/view that we lookup.
+   * @param alias The alias name of the table/view that we lookup.
+   * @param currentDatabase The database name we should use to lookup the 
table/view, if the
+   *database part of [[TableIdentifier]] is not 
defined.
*/
-  def lookupRelation(name: TableIdentifier, alias: Option[String] = None): 
LogicalPlan = {
+  def lookupRelation(
+  name: TableIdentifier,
+  alias: Option[String] = None,
+  currentDatabase: Option[String] = None): LogicalPlan = {
 synchronized {
-  val db = formatDatabaseName(name.database.getOrElse(currentDb))
+  val db = 
formatDatabaseName(name.database.getOrElse(currentDatabase.getOrElse(currentDb)))
--- End diff --

I feel that currentDatabase and currentDb will confuse readers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93551078
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,62 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we 
lookup the table from catalog
+// and change the current database name if it is a view.
+def resolveRelation(
+plan: LogicalPlan,
+currentDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, currentDatabase))
+}
+
+// Lookup the table with the given name from catalog. If 
`currentDatabase` is set, we lookup
+// the table in the database `currentDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
+u: UnresolvedRelation,
+currentDatabase: Option[String] = None): LogicalPlan = {
   try {
-catalog.lookupRelation(u.tableIdentifier, u.alias)
+catalog.lookupRelation(u.tableIdentifier, u.alias, currentDatabase)
   } catch {
 case _: NoSuchTableException =>
   u.failAnalysis(s"Table or view not found: ${u.tableName}")
   }
 }
 
-def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
-i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-  case u: UnresolvedRelation =>
-val table = u.tableIdentifier
-if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
-(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-  // If the database part is specified, and we support running SQL 
directly on files, and
-  // it's not a temporary view, and the table does not exist, then 
let's just return the
-  // original UnresolvedRelation. It is possible we are matching a 
query like "select *
-  // from parquet.`/path/to/query`". The plan will get resolved 
later.
-  // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
-  // an exception from tableExists if the database does not exist.
-  u
-} else {
-  lookupTableFromCatalog(u)
+// If the database part is specified, and we support running SQL 
directly on files, and
+// it's not a temporary view, and the table does not exist, then let's 
just return the
+// original UnresolvedRelation. It is possible we are matching a query 
like "select *
+// from parquet.`/path/to/query`". The plan will get resolved later.
+// Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
+// an exception from tableExists if the database does not exist.
+private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
+  table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
+(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
+}
+
+// Change the current database name if the plan is a view, and 
transformDown with the new
+// database name to resolve all UnresolvedRelation.
+def resolveView(plan: LogicalPlan): LogicalPlan = plan match {
+  case p @ SubqueryAlias(_, view: View, _) =>
+val currentDatabase = view.currentDatabase
+val newChild = view transform {
+  case v: View if !v.resolved =>
+resolveView(v)
+  case u: UnresolvedRelation =>
+resolveRelation(u, currentDatabase)
--- End diff --

oh, resolveRelation's second argument is the current db. 


---
If your

[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93549589
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 ---
@@ -396,6 +396,20 @@ case class InsertIntoTable(
 }
 
 /**
+ * A container for holding the current database of a view and a query plan.
+ * This operator will be removed at the begining of the optimize stage so 
we can see what is part
+ * of a view in a analyzed plan.
+ *
+ * @param child The logical plan of this view.
+ * @param currentDatabase The database name we use to resolve the logical 
plan.
+ */
+case class View(child: LogicalPlan, currentDatabase: Option[String]) 
extends LogicalPlan {
--- End diff --

Also, I feel `currentDatabase` may not be a good name. I am wondering if 
`defaultDatabase` is better.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93549309
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,62 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we 
lookup the table from catalog
+// and change the current database name if it is a view.
+def resolveRelation(
+plan: LogicalPlan,
+currentDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, currentDatabase))
+}
+
+// Lookup the table with the given name from catalog. If 
`currentDatabase` is set, we lookup
+// the table in the database `currentDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
--- End diff --

If so, should we just inline it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93549165
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,62 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we 
lookup the table from catalog
+// and change the current database name if it is a view.
+def resolveRelation(
+plan: LogicalPlan,
+currentDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, currentDatabase))
+}
+
+// Lookup the table with the given name from catalog. If 
`currentDatabase` is set, we lookup
+// the table in the database `currentDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
+u: UnresolvedRelation,
+currentDatabase: Option[String] = None): LogicalPlan = {
   try {
-catalog.lookupRelation(u.tableIdentifier, u.alias)
+catalog.lookupRelation(u.tableIdentifier, u.alias, currentDatabase)
   } catch {
 case _: NoSuchTableException =>
   u.failAnalysis(s"Table or view not found: ${u.tableName}")
   }
 }
 
-def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
-  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
-i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
-  case u: UnresolvedRelation =>
-val table = u.tableIdentifier
-if (table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
-(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))) {
-  // If the database part is specified, and we support running SQL 
directly on files, and
-  // it's not a temporary view, and the table does not exist, then 
let's just return the
-  // original UnresolvedRelation. It is possible we are matching a 
query like "select *
-  // from parquet.`/path/to/query`". The plan will get resolved 
later.
-  // Note that we are testing (!db_exists || !table_exists) 
because the catalog throws
-  // an exception from tableExists if the database does not exist.
-  u
-} else {
-  lookupTableFromCatalog(u)
+// If the database part is specified, and we support running SQL 
directly on files, and
+// it's not a temporary view, and the table does not exist, then let's 
just return the
+// original UnresolvedRelation. It is possible we are matching a query 
like "select *
+// from parquet.`/path/to/query`". The plan will get resolved later.
+// Note that we are testing (!db_exists || !table_exists) because the 
catalog throws
+// an exception from tableExists if the database does not exist.
+private def isRunningDirectlyOnFiles(table: TableIdentifier): Boolean 
= {
+  table.database.isDefined && conf.runSQLonFile && 
!catalog.isTemporaryTable(table) &&
+(!catalog.databaseExists(table.database.get) || 
!catalog.tableExists(table))
+}
+
+// Change the current database name if the plan is a view, and 
transformDown with the new
+// database name to resolve all UnresolvedRelation.
+def resolveView(plan: LogicalPlan): LogicalPlan = plan match {
+  case p @ SubqueryAlias(_, view: View, _) =>
+val currentDatabase = view.currentDatabase
+val newChild = view transform {
+  case v: View if !v.resolved =>
+resolveView(v)
+  case u: UnresolvedRelation =>
+resolveRelation(u, currentDatabase)
--- End diff --

Is this right? `u` can also have db name, right?


---
If your project is s

[GitHub] spark pull request #16233: [SPARK-18801][SQL] Add `View` operator to help re...

2016-12-21 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16233#discussion_r93548847
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 ---
@@ -510,32 +510,62 @@ class Analyzer(
* Replaces [[UnresolvedRelation]]s with concrete relations from the 
catalog.
*/
   object ResolveRelations extends Rule[LogicalPlan] {
-private def lookupTableFromCatalog(u: UnresolvedRelation): LogicalPlan 
= {
+
+def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
+  case i @ InsertIntoTable(u: UnresolvedRelation, parts, child, _, _) 
if child.resolved =>
+i.copy(table = EliminateSubqueryAliases(lookupTableFromCatalog(u)))
+  case u: UnresolvedRelation => resolveRelation(u)
+}
+
+// If the unresolved relation is running directly on files, we just 
return the original
+// UnresolvedRelation, the plan will get resolved later. Else we 
lookup the table from catalog
+// and change the current database name if it is a view.
+def resolveRelation(
+plan: LogicalPlan,
+currentDatabase: Option[String] = None): LogicalPlan = plan match {
+  case u @ UnresolvedRelation(table: TableIdentifier, _) if 
isRunningDirectlyOnFiles(table) =>
+u
+  case u: UnresolvedRelation =>
+resolveView(lookupTableFromCatalog(u, currentDatabase))
+}
+
+// Lookup the table with the given name from catalog. If 
`currentDatabase` is set, we lookup
+// the table in the database `currentDatabase`, else we follow the 
default way.
+private def lookupTableFromCatalog(
--- End diff --

Is this function only used once?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...

2016-12-21 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16357
  
@mridulm ok. I merged this because it is a backport (the original patch has 
already been merged to 2.1 and master) and I believe Josh has already addressed 
your concerns. If you want us hold the merge, it will be good to explicitly 
mention it next time. Thanks!



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

2016-12-21 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master b7650f11c -> 1a6438897


[SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

## What changes were proposed in this pull request?
I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes 
jackson fail to handle byte array defined in a case class. Then I find 
https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests 
that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are 
using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use 
com.thoughtworks.paranamer/paranamer 2.6, I suggests that we upgrade paranamer 
to 2.6.

Author: Yin Huai <yh...@databricks.com>

Closes #16359 from yhuai/SPARK-18951.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1a643889
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1a643889
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1a643889

Branch: refs/heads/master
Commit: 1a64388973711b4e567f25fa33d752066a018b49
Parents: b7650f1
Author: Yin Huai <yh...@databricks.com>
Authored: Wed Dec 21 09:26:13 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Wed Dec 21 09:26:13 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 7 ++-
 6 files changed, 11 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index afbdae0..9cbab3d8 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -128,7 +128,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index adf3863..63ce6c6 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -135,7 +135,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index 88e6b3f..122d5c2 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -135,7 +135,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 15c5d9f..776aabd 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -143,7 +143,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index 77fb537..524e824 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -144,7 +144,7 @@ objenesis-2.1.jar
 opencsv-2.3.jar
 oro-2.0.8.jar
 osgi-resource-locator-1.0.1.jar
-paranamer-2.3.jar
+paranamer-2.6.jar
 parquet-column-1.8.1.jar
 parquet-common-1.8.1.jar
 parquet-encoding-1.8.1.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/1a643889/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 4f12085..72e5442 100644
--- a/pom.xml
+++ b/pom.xml
@@ -179,7 +179,7 @@
 4.5.3
 1.1
 2.52.0
-2.8
+2.6
 1.8
 1.0.0
 
@@ -1863,6 +1863,11 @@
   
 
   
+  
+com.thoughtworks.paranamer
+paranamer
+  

[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...

2016-12-21 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16359
  
Thanks! I will get this in master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...

2016-12-20 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16359
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18928][BRANCH-2.0] Check TaskContext.isInterrupted() in FileScanRDD, JDBCRDD & UnsafeSorter

2016-12-20 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 678d91c1d -> 2aae220b5


[SPARK-18928][BRANCH-2.0] Check TaskContext.isInterrupted() in FileScanRDD, 
JDBCRDD & UnsafeSorter

This is a branch-2.0 backport of #16340; the original description follows:

## What changes were proposed in this pull request?

In order to respond to task cancellation, Spark tasks must periodically check 
`TaskContext.isInterrupted()`, but this check is missing on a few critical read 
paths used in Spark SQL, including `FileScanRDD`, `JDBCRDD`, and 
UnsafeSorter-based sorts. This can cause interrupted / cancelled tasks to 
continue running and become zombies (as also described in #16189).

This patch aims to fix this problem by adding `TaskContext.isInterrupted()` 
checks to these paths. Note that I could have used `InterruptibleIterator` to 
simply wrap a bunch of iterators but in some cases this would have an adverse 
performance penalty or might not be effective due to certain special uses of 
Iterators in Spark SQL. Instead, I inlined `InterruptibleIterator`-style logic 
into existing iterator subclasses.

## How was this patch tested?

Tested manually in `spark-shell` with two different reproductions of 
non-cancellable tasks, one involving scans of huge files and another involving 
sort-merge joins that spill to disk. Both causes of zombie tasks are fixed by 
the changes added here.

Author: Josh Rosen 

Closes #16357 from JoshRosen/sql-task-interruption-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2aae220b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2aae220b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2aae220b

Branch: refs/heads/branch-2.0
Commit: 2aae220b536065f55b2cf644a2a223aab0d051d0
Parents: 678d91c
Author: Josh Rosen 
Authored: Tue Dec 20 16:05:04 2016 -0800
Committer: Yin Huai 
Committed: Tue Dec 20 16:05:04 2016 -0800

--
 .../collection/unsafe/sort/UnsafeInMemorySorter.java| 11 +++
 .../collection/unsafe/sort/UnsafeSorterSpillReader.java | 11 +++
 .../spark/sql/execution/datasources/FileScanRDD.scala   | 12 ++--
 .../spark/sql/execution/datasources/jdbc/JDBCRDD.scala  |  9 -
 4 files changed, 40 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2aae220b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
index b517371..2bd756f 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeInMemorySorter.java
@@ -21,6 +21,8 @@ import java.util.Comparator;
 
 import org.apache.avro.reflect.Nullable;
 
+import org.apache.spark.TaskContext;
+import org.apache.spark.TaskKilledException;
 import org.apache.spark.memory.MemoryConsumer;
 import org.apache.spark.memory.TaskMemoryManager;
 import org.apache.spark.unsafe.Platform;
@@ -226,6 +228,7 @@ public final class UnsafeInMemorySorter {
 private long keyPrefix;
 private int recordLength;
 private long currentPageNumber;
+private final TaskContext taskContext = TaskContext.get();
 
 private SortedIterator(int numRecords, int offset) {
   this.numRecords = numRecords;
@@ -256,6 +259,14 @@ public final class UnsafeInMemorySorter {
 
 @Override
 public void loadNext() {
+  // Kill the task in case it has been marked as killed. This logic is from
+  // InterruptibleIterator, but we inline it here instead of wrapping the 
iterator in order
+  // to avoid performance overhead. This check is added here in 
`loadNext()` instead of in
+  // `hasNext()` because it's technically possible for the caller to be 
relying on
+  // `getNumRecords()` instead of `hasNext()` to know when to stop.
+  if (taskContext != null && taskContext.isInterrupted()) {
+throw new TaskKilledException();
+  }
   // This pointer points to a 4-byte record length, followed by the 
record's bytes
   final long recordPointer = array.get(offset + position);
   currentPageNumber = TaskMemoryManager.decodePageNumber(recordPointer);

http://git-wip-us.apache.org/repos/asf/spark/blob/2aae220b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeSorterSpillReader.java
--
diff --git 

[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...

2016-12-20 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16357
  
Merging to branch 2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18761][BRANCH-2.0] Introduce "task reaper" to oversee task killing in executors

2016-12-20 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1f0c5fa75 -> 678d91c1d


[SPARK-18761][BRANCH-2.0] Introduce "task reaper" to oversee task killing in 
executors

Branch-2.0 backport of #16189; original description follows:

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" 
because some tasks may not be interruptible or may not respond to their 
"killed" flags being set. If a significant fraction of a cluster's task slots 
are occupied by tasks that have been marked as killed but remain running then 
this can lead to a situation where new jobs and tasks are starved of resources 
that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to 
executors. At a high-level, task killing now launches a new thread which 
attempts to kill the task and then watches the task and periodically checks 
whether it has been killed. The TaskReaper will periodically re-attempt to call 
`TaskRunner.kill()` and will log warnings if the task keeps running. I modified 
TaskRunner to rename its thread at the start of the task, allowing TaskReaper 
to take a thread dump and filter it in order to log stacktraces from the exact 
task thread that we are waiting to finish. If the task has not stopped after a 
configurable timeout then the TaskReaper will throw an exception to trigger 
executor JVM death, thereby forcibly freeing any resources consumed by the 
zombie tasks.

This feature is flagged off by default and is controlled by four new 
configurations under the `spark.task.reaper.*` namespace. See the updated 
`configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen 

Closes #16358 from JoshRosen/cancellation-branch-2.0.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/678d91c1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/678d91c1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/678d91c1

Branch: refs/heads/branch-2.0
Commit: 678d91c1d2283d9965a39656af9d383bad093ba8
Parents: 1f0c5fa
Author: Josh Rosen 
Authored: Tue Dec 20 15:56:56 2016 -0800
Committer: Yin Huai 
Committed: Tue Dec 20 15:56:56 2016 -0800

--
 .../org/apache/spark/executor/Executor.scala| 169 ++-
 .../scala/org/apache/spark/util/Utils.scala |  26 ++-
 .../org/apache/spark/JobCancellationSuite.scala |  77 +
 docs/configuration.md   |  42 +
 4 files changed, 300 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/678d91c1/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9a017f2..93e994b 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -84,6 +84,16 @@ private[spark] class Executor(
   // Start worker thread pool
   private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor 
task launch worker")
   private val executorSource = new ExecutorSource(threadPool, executorId)
+  // Pool used for threads that supervise task killing / cancellation
+  private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task 
reaper")
+  // For tasks which are in the process of being killed, this map holds the 
most recently created
+  // TaskReaper. All accesses to this map should be synchronized on the map 
itself (this isn't
+  // a ConcurrentHashMap because we use the synchronization for purposes other 
than simply guarding
+  // the integrity of the map's internal state). The purpose of this map is to 
prevent the creation
+  // of a separate TaskReaper for every killTask() of a given task. Instead, 
this map allows us to
+  // track whether an existing TaskReaper fulfills the role of a TaskReaper 
that we would otherwise
+  // create. The map key is a task id.
+  private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, 
TaskReaper]()
 
   if (!isLocal) {
 env.metricsSystem.registerSource(executorSource)
@@ -93,6 +103,9 @@ private[spark] class Executor(
   // Whether to load classes in user jars before those in Spark jars
   private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
 
+  // Whether to monitor killed / interrupted tasks
+  private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", 
false)
+
   // Create our 

[GitHub] spark issue #16358: [SPARK-18761][branch-2.0] Introduce "task reaper" to ove...

2016-12-20 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16358
  
LGTM. Merging to branch-2.0


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/paranam...

2016-12-20 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16359
  
@srowen @JoshRosen  for review


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/...

2016-12-20 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16359#discussion_r93334655
  
--- Diff: pom.xml ---
@@ -179,7 +179,7 @@
 4.5.3
 1.1
 2.52.0
-2.8
+2.6
--- End diff --

Although we defined the version to 2.8, but it is not used this variable. 
So, we has been using 2.3. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16359: [SPARK-18951] Upgrade com.thoughtworks.paranamer/...

2016-12-20 Thread yhuai
GitHub user yhuai opened a pull request:

https://github.com/apache/spark/pull/16359

[SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

## What changes were proposed in this pull request?
I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes 
jackson fail to handle byte array defined in a case class. Then I find 
https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests 
that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are 
using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use 
com.thoughtworks.paranamer/paranamer uses 2.6, I suggests that we upgrade 
paranamer to 2.6.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yhuai/spark SPARK-18951

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16359.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16359


commit c502aeb123641f634c107c0ad8c0f1986fea8ee1
Author: Yin Huai <yh...@databricks.com>
Date:   2016-12-20T21:58:42Z

[SPARK-18951] Upgrade com.thoughtworks.paranamer/paranamer to 2.6

I recently hit a bug of com.thoughtworks.paranamer/paranamer, which causes 
jackson fail to handle byte array defined in a case class. Then I find 
https://github.com/FasterXML/jackson-module-scala/issues/48, which suggests 
that it is caused by a bug in paranamer. Let's upgrade paranamer. Since we are 
using jackson 2.6.5 and jackson-module-paranamer 2.6.5 use 
com.thoughtworks.paranamer/paranamer uses 2.6, I suggests that we upgrade 
paranamer to 2.6.




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16357: [SPARK-18928][branch-2.0]Check TaskContext.isInterrupted...

2016-12-20 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16357
  
LGTM 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16330: [SPARK-18817][SPARKR][SQL] change derby log outpu...

2016-12-19 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16330#discussion_r93176308
  
--- Diff: core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala 
---
@@ -104,6 +104,12 @@ class SparkHadoopUtil extends Logging {
   }
   val bufferSize = conf.get("spark.buffer.size", "65536")
   hadoopConf.set("io.file.buffer.size", bufferSize)
+
+  if (conf.contains("spark.sql.default.derby.dir")) {
--- End diff --

Why do we need to introduce this flag?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-19 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
@mridulm Sure. Also, please feel free to leave more comments :) 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 5857b9ac2 -> fa829ce21


[SPARK-18761][CORE] Introduce "task reaper" to oversee task killing in executors

## What changes were proposed in this pull request?

Spark's current task cancellation / task killing mechanism is "best effort" 
because some tasks may not be interruptible or may not respond to their 
"killed" flags being set. If a significant fraction of a cluster's task slots 
are occupied by tasks that have been marked as killed but remain running then 
this can lead to a situation where new jobs and tasks are starved of resources 
that are being used by these zombie tasks.

This patch aims to address this problem by adding a "task reaper" mechanism to 
executors. At a high-level, task killing now launches a new thread which 
attempts to kill the task and then watches the task and periodically checks 
whether it has been killed. The TaskReaper will periodically re-attempt to call 
`TaskRunner.kill()` and will log warnings if the task keeps running. I modified 
TaskRunner to rename its thread at the start of the task, allowing TaskReaper 
to take a thread dump and filter it in order to log stacktraces from the exact 
task thread that we are waiting to finish. If the task has not stopped after a 
configurable timeout then the TaskReaper will throw an exception to trigger 
executor JVM death, thereby forcibly freeing any resources consumed by the 
zombie tasks.

This feature is flagged off by default and is controlled by four new 
configurations under the `spark.task.reaper.*` namespace. See the updated 
`configuration.md` doc for details.

## How was this patch tested?

Tested via a new test case in `JobCancellationSuite`, plus manual testing.

Author: Josh Rosen 

Closes #16189 from JoshRosen/cancellation.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fa829ce2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fa829ce2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fa829ce2

Branch: refs/heads/master
Commit: fa829ce21fb84028d90b739a49c4ece70a17ccfd
Parents: 5857b9a
Author: Josh Rosen 
Authored: Mon Dec 19 18:43:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 18:43:59 2016 -0800

--
 .../org/apache/spark/executor/Executor.scala| 169 ++-
 .../scala/org/apache/spark/util/Utils.scala |  56 +++---
 .../org/apache/spark/JobCancellationSuite.scala |  77 +
 docs/configuration.md   |  42 +
 4 files changed, 316 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fa829ce2/core/src/main/scala/org/apache/spark/executor/Executor.scala
--
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala 
b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index 9501dd9..3346f6d 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -84,6 +84,16 @@ private[spark] class Executor(
   // Start worker thread pool
   private val threadPool = ThreadUtils.newDaemonCachedThreadPool("Executor 
task launch worker")
   private val executorSource = new ExecutorSource(threadPool, executorId)
+  // Pool used for threads that supervise task killing / cancellation
+  private val taskReaperPool = ThreadUtils.newDaemonCachedThreadPool("Task 
reaper")
+  // For tasks which are in the process of being killed, this map holds the 
most recently created
+  // TaskReaper. All accesses to this map should be synchronized on the map 
itself (this isn't
+  // a ConcurrentHashMap because we use the synchronization for purposes other 
than simply guarding
+  // the integrity of the map's internal state). The purpose of this map is to 
prevent the creation
+  // of a separate TaskReaper for every killTask() of a given task. Instead, 
this map allows us to
+  // track whether an existing TaskReaper fulfills the role of a TaskReaper 
that we would otherwise
+  // create. The map key is a task id.
+  private val taskReaperForTask: HashMap[Long, TaskReaper] = HashMap[Long, 
TaskReaper]()
 
   if (!isLocal) {
 env.metricsSystem.registerSource(executorSource)
@@ -93,6 +103,9 @@ private[spark] class Executor(
   // Whether to load classes in user jars before those in Spark jars
   private val userClassPathFirst = 
conf.getBoolean("spark.executor.userClassPathFirst", false)
 
+  // Whether to monitor killed / interrupted tasks
+  private val taskReaperEnabled = conf.getBoolean("spark.task.reaper.enabled", 
false)
+
   // Create our ClassLoader
   // do this after SparkEnv creation so can access the SecurityManager
   

[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-19 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
LGTM!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-19 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
Thank you for those comments. I am merging this to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-19 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r93162832
  
--- Diff: core/src/test/scala/org/apache/spark/JobCancellationSuite.scala 
---
@@ -209,6 +209,83 @@ class JobCancellationSuite extends SparkFunSuite with 
Matchers with BeforeAndAft
 assert(jobB.get() === 100)
   }
 
+  test("task reaper kills JVM if killed tasks keep running for too long") {
+val conf = new SparkConf()
+  .set("spark.task.reaper.enabled", "true")
+  .set("spark.task.reaper.killTimeout", "5s")
+sc = new SparkContext("local-cluster[2,1,1024]", "test", conf)
+
+// Add a listener to release the semaphore once any tasks are launched.
+val sem = new Semaphore(0)
+sc.addSparkListener(new SparkListener {
+  override def onTaskStart(taskStart: SparkListenerTaskStart) {
+sem.release()
+  }
+})
+
+// jobA is the one to be cancelled.
+val jobA = Future {
+  sc.setJobGroup("jobA", "this is a job to be cancelled", 
interruptOnCancel = true)
+  sc.parallelize(1 to 1, 2).map { i =>
+while (true) { }
+  }.count()
+}
+
+// Block until both tasks of job A have started and cancel job A.
+sem.acquire(2)
+// Small delay to ensure tasks actually start executing the task body
+Thread.sleep(1000)
+
+sc.clearJobGroup()
+val jobB = sc.parallelize(1 to 100, 2).countAsync()
+sc.cancelJobGroup("jobA")
+val e = intercept[SparkException] { ThreadUtils.awaitResult(jobA, 
15.seconds) }.getCause
+assert(e.getMessage contains "cancel")
+
+// Once A is cancelled, job B should finish fairly quickly.
+assert(ThreadUtils.awaitResult(jobB, 60.seconds) === 100)
+  }
+
+  test("task reaper will not kill JVM if spark.task.killTimeout == -1") {
+val conf = new SparkConf()
+  .set("spark.task.reaper.enabled", "true")
+  .set("spark.task.reaper.killTimeout", "-1")
+  .set("spark.task.reaper.PollingInterval", "1s")
+  .set("spark.deploy.maxExecutorRetries", "1")
--- End diff --

We set it to 1 to make sure that we will not kill JVM, right (if we kill 
JVM, we will remove the application because spark.deploy.maxExecutorRetries is 
1.)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 fc1b25660 -> c1a26b458


[SPARK-18921][SQL] check database existence with Hive.databaseExists instead of 
getDatabase

## What changes were proposed in this pull request?

It's weird that we use `Hive.getDatabase` to check the existence of a database, 
while Hive has a `databaseExists` interface.

What's worse, `Hive.getDatabase` will produce an error message if the database 
doesn't exist, which is annoying when we only want to check the database 
existence.

This PR fixes this and use `Hive.databaseExists` to check database existence.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16332 from cloud-fan/minor.

(cherry picked from commit 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c1a26b45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c1a26b45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c1a26b45

Branch: refs/heads/branch-2.1
Commit: c1a26b458dd353be3ab1a2b3f9bb80809cf63479
Parents: fc1b256
Author: Wenchen Fan 
Authored: Mon Dec 19 11:42:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 11:43:55 2016 -0800

--
 .../apache/spark/sql/hive/HiveExternalCatalog.scala|  2 +-
 .../org/apache/spark/sql/hive/client/HiveClient.scala  |  8 +++-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala  | 12 
 .../apache/spark/sql/hive/client/VersionsSuite.scala   | 13 +++--
 4 files changed, 19 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f67ddc9..f321c45 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   override def databaseExists(db: String): Boolean = withClient {
-client.getDatabaseOption(db).isDefined
+client.databaseExists(db)
   }
 
   override def listDatabases(): Seq[String] = withClient {

http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 8e7c871..0be5b0b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -58,12 +58,10 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it 
doesn't exist */
-  final def getDatabase(name: String): CatalogDatabase = {
-getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
-  }
+  def getDatabase(name: String): CatalogDatabase
 
-  /** Returns the metadata for a given database, or None if it doesn't exist. 
*/
-  def getDatabaseOption(name: String): Option[CatalogDatabase]
+  /** Return whether a table/view with the specified name exists. */
+  def databaseExists(dbName: String): Boolean
 
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/c1a26b45/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index db73596..e0f7156 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -300,7 +300,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
-if (getDatabaseOption(databaseName).isDefined) {
+if (databaseExists(databaseName)) {
   state.setCurrentDatabase(databaseName)
 } else {
   throw new 

spark git commit: [SPARK-18921][SQL] check database existence with Hive.databaseExists instead of getDatabase

2016-12-19 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 24482858e -> 7a75ee1c9


[SPARK-18921][SQL] check database existence with Hive.databaseExists instead of 
getDatabase

## What changes were proposed in this pull request?

It's weird that we use `Hive.getDatabase` to check the existence of a database, 
while Hive has a `databaseExists` interface.

What's worse, `Hive.getDatabase` will produce an error message if the database 
doesn't exist, which is annoying when we only want to check the database 
existence.

This PR fixes this and use `Hive.databaseExists` to check database existence.

## How was this patch tested?

N/A

Author: Wenchen Fan 

Closes #16332 from cloud-fan/minor.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a75ee1c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a75ee1c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a75ee1c

Branch: refs/heads/master
Commit: 7a75ee1c9224aa5c2e954fe2a71f9ad506f6782b
Parents: 2448285
Author: Wenchen Fan 
Authored: Mon Dec 19 11:42:59 2016 -0800
Committer: Yin Huai 
Committed: Mon Dec 19 11:42:59 2016 -0800

--
 .../apache/spark/sql/hive/HiveExternalCatalog.scala|  2 +-
 .../org/apache/spark/sql/hive/client/HiveClient.scala  |  8 +++-
 .../apache/spark/sql/hive/client/HiveClientImpl.scala  | 12 
 .../apache/spark/sql/hive/client/VersionsSuite.scala   | 13 +++--
 4 files changed, 19 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 544f277..9c19a0e 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -167,7 +167,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   }
 
   override def databaseExists(db: String): Boolean = withClient {
-client.getDatabaseOption(db).isDefined
+client.databaseExists(db)
   }
 
   override def listDatabases(): Seq[String] = withClient {

http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
index 837b6c5..8bdcf31 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClient.scala
@@ -58,12 +58,10 @@ private[hive] trait HiveClient {
   def setCurrentDatabase(databaseName: String): Unit
 
   /** Returns the metadata for specified database, throwing an exception if it 
doesn't exist */
-  final def getDatabase(name: String): CatalogDatabase = {
-getDatabaseOption(name).getOrElse(throw new NoSuchDatabaseException(name))
-  }
+  def getDatabase(name: String): CatalogDatabase
 
-  /** Returns the metadata for a given database, or None if it doesn't exist. 
*/
-  def getDatabaseOption(name: String): Option[CatalogDatabase]
+  /** Return whether a table/view with the specified name exists. */
+  def databaseExists(dbName: String): Boolean
 
   /** List the names of all the databases that match the specified pattern. */
   def listDatabases(pattern: String): Seq[String]

http://git-wip-us.apache.org/repos/asf/spark/blob/7a75ee1c/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index b75f6e9..bacae8a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -300,7 +300,7 @@ private[hive] class HiveClientImpl(
   }
 
   override def setCurrentDatabase(databaseName: String): Unit = withHiveState {
-if (getDatabaseOption(databaseName).isDefined) {
+if (databaseExists(databaseName)) {
   state.setCurrentDatabase(databaseName)
 } else {
   throw new NoSuchDatabaseException(databaseName)
@@ -336,14 +336,18 @@ private[hive] class HiveClientImpl(
 

[GitHub] spark issue #16332: [SPARK-18921][SQL] check database existence with Hive.da...

2016-12-19 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16332
  
LGTM. Merging to master and branch 2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r92905791
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +465,93 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
--- End diff --

It will be good to explain more about how this class works. It is great 
that we have comments at specific parts, but a section explains the workflow 
will be very helpful to future readers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r92905622
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +465,93 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
+  taskRunner: TaskRunner,
+  val interruptThread: Boolean)
+extends Runnable {
+
+private[this] val taskId: Long = taskRunner.taskId
+
+private[this] val killPollingIntervalMs: Long =
+  conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.reaper.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.reaper.threadDump", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+  def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs 
> killTimeoutMs
+  try {
+// Only attempt to kill the task once. If interruptThread = false 
then a second kill
+// attempt would be a no-op and if interruptThread = true then it 
may not be safe or
+// effective to interrupt multiple times:
+taskRunner.kill(interruptThread = interruptThread)
+// Monitor the killed task until it exits:
+var finished: Boolean = false
+while (!finished && !timeoutExceeded()) {
+  taskRunner.synchronized {
+// We need to synchronize on the TaskRunner while checking 
whether the task has
+// finished in order to avoid a race where the task is marked 
as finished right after
+// we check and before we call wait().
+if (taskRunner.isFinished) {
+  finished = true
+} else {
+  taskRunner.wait(killPollingIntervalMs)
--- End diff --

oh, the notifyAll is used for this, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r92905603
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -432,6 +465,93 @@ private[spark] class Executor(
   }
 
   /**
+   * Supervises the killing / cancellation of a task by sending the 
interrupted flag, optionally
+   * sending a Thread.interrupt(), and monitoring the task until it 
finishes.
+   */
+  private class TaskReaper(
+  taskRunner: TaskRunner,
+  val interruptThread: Boolean)
+extends Runnable {
+
+private[this] val taskId: Long = taskRunner.taskId
+
+private[this] val killPollingIntervalMs: Long =
+  conf.getTimeAsMs("spark.task.reaper.pollingInterval", "10s")
+
+private[this] val killTimeoutMs: Long = 
conf.getTimeAsMs("spark.task.reaper.killTimeout", "2m")
+
+private[this] val takeThreadDump: Boolean =
+  conf.getBoolean("spark.task.reaper.threadDump", true)
+
+override def run(): Unit = {
+  val startTimeMs = System.currentTimeMillis()
+  def elapsedTimeMs = System.currentTimeMillis() - startTimeMs
+  def timeoutExceeded(): Boolean = killTimeoutMs > 0 && elapsedTimeMs 
> killTimeoutMs
+  try {
+// Only attempt to kill the task once. If interruptThread = false 
then a second kill
+// attempt would be a no-op and if interruptThread = true then it 
may not be safe or
+// effective to interrupt multiple times:
+taskRunner.kill(interruptThread = interruptThread)
+// Monitor the killed task until it exits:
+var finished: Boolean = false
+while (!finished && !timeoutExceeded()) {
+  taskRunner.synchronized {
+// We need to synchronize on the TaskRunner while checking 
whether the task has
+// finished in order to avoid a race where the task is marked 
as finished right after
+// we check and before we call wait().
+if (taskRunner.isFinished) {
+  finished = true
+} else {
+  taskRunner.wait(killPollingIntervalMs)
+}
+  }
+  if (taskRunner.isFinished) {
+finished = true
+  } else {
+logWarning(s"Killed task $taskId is still running after 
$elapsedTimeMs ms")
+if (takeThreadDump) {
+  try {
+
Utils.getThreadDumpForThread(taskRunner.getThreadId).foreach { thread =>
+  if (thread.threadName == taskRunner.threadName) {
+logWarning(s"Thread dump from task 
$taskId:\n${thread.stackTrace}")
+  }
+}
+  } catch {
+case NonFatal(e) =>
+  logWarning("Exception thrown while obtaining thread 
dump: ", e)
+  }
+}
+  }
+}
+
+if (!taskRunner.isFinished && timeoutExceeded()) {
+  if (isLocal) {
+logError(s"Killed task $taskId could not be stopped within 
$killTimeoutMs ms; " +
+  "not killing JVM because we are running in local mode.")
+  } else {
+throw new SparkException(
+  s"Killing executor JVM because killed task $taskId could not 
be stopped within " +
+s"$killTimeoutMs ms.")
--- End diff --

I guess I am not clear how we kill the JVM. Are we using this exception to 
kill the JVM?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16189: [SPARK-18761][CORE] Introduce "task reaper" to ov...

2016-12-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/16189#discussion_r92904925
  
--- Diff: core/src/main/scala/org/apache/spark/executor/Executor.scala ---
@@ -229,9 +259,12 @@ private[spark] class Executor(
   // ClosedByInterruptException during execBackend.statusUpdate which 
causes
   // Executor to crash
   Thread.interrupted()
+  notifyAll()
--- End diff --

How about we also comment the purpose of this `notifyAll`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-16 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
I am reviewing it now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-16 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
let's trigger more tests and see if the test is flaky.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16189: [SPARK-18761][CORE] Introduce "task reaper" to oversee t...

2016-12-16 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16189
  
test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16288: [SPARK-18869][SQL] Add TreeNode.p that returns BaseType

2016-12-14 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16288
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16277: [SPARK-18854][SQL] numberedTreeString and apply(i) incon...

2016-12-13 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16277
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16268: [SPARK-18843][Core]Fix timeout in awaitResultInForkJoinS...

2016-12-13 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16268
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using ForkJoinPool

2016-12-13 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master d53f18cae -> fb3081d3b


[SPARK-13747][CORE] Fix potential ThreadLocal leaks in RPC when using 
ForkJoinPool

## What changes were proposed in this pull request?

Some places in SQL may call `RpcEndpointRef.askWithRetry` (e.g., 
ParquetFileFormat.buildReader -> SparkContext.broadcast -> ... -> 
BlockManagerMaster.updateBlockInfo -> RpcEndpointRef.askWithRetry), which will 
finally call `Await.result`. It may cause `java.lang.IllegalArgumentException: 
spark.sql.execution.id is already set` when running in Scala ForkJoinPool.

This PR includes the following changes to fix this issue:

- Remove `ThreadUtils.awaitResult`
- Rename `ThreadUtils. awaitResultInForkJoinSafely` to `ThreadUtils.awaitResult`
- Replace `Await.result` in RpcTimeout with `ThreadUtils.awaitResult`.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16230 from zsxwing/fix-SPARK-13747.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb3081d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb3081d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb3081d3

Branch: refs/heads/master
Commit: fb3081d3b38a50aa5e023c603e1b191e57f7c876
Parents: d53f18c
Author: Shixiong Zhu 
Authored: Tue Dec 13 09:53:22 2016 -0800
Committer: Yin Huai 
Committed: Tue Dec 13 09:53:22 2016 -0800

--
 .../scala/org/apache/spark/rpc/RpcTimeout.scala | 12 ++
 .../org/apache/spark/util/ThreadUtils.scala | 41 
 .../apache/spark/rdd/AsyncRDDActionsSuite.scala |  3 +-
 .../OutputCommitCoordinatorSuite.scala  |  3 +-
 scalastyle-config.xml   |  1 -
 .../sql/execution/basicPhysicalOperators.scala  |  2 +-
 .../exchange/BroadcastExchangeExec.scala|  3 +-
 7 files changed, 23 insertions(+), 42 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb3081d3/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
--
diff --git a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala 
b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
index 2761d39..efd2648 100644
--- a/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
+++ b/core/src/main/scala/org/apache/spark/rpc/RpcTimeout.scala
@@ -24,7 +24,7 @@ import scala.concurrent.duration._
 import scala.util.control.NonFatal
 
 import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.util.Utils
+import org.apache.spark.util.{ThreadUtils, Utils}
 
 /**
  * An exception thrown if RpcTimeout modifies a [[TimeoutException]].
@@ -72,15 +72,9 @@ private[spark] class RpcTimeout(val duration: 
FiniteDuration, val timeoutProp: S
* is still not ready
*/
   def awaitResult[T](future: Future[T]): T = {
-val wrapAndRethrow: PartialFunction[Throwable, T] = {
-  case NonFatal(t) =>
-throw new SparkException("Exception thrown in awaitResult", t)
-}
 try {
-  // scalastyle:off awaitresult
-  Await.result(future, duration)
-  // scalastyle:on awaitresult
-} catch addMessageIfTimeout.orElse(wrapAndRethrow)
+  ThreadUtils.awaitResult(future, duration)
+} catch addMessageIfTimeout
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fb3081d3/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
--
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index 60a6e82..1aa4456 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -19,7 +19,7 @@ package org.apache.spark.util
 
 import java.util.concurrent._
 
-import scala.concurrent.{Await, Awaitable, ExecutionContext, 
ExecutionContextExecutor}
+import scala.concurrent.{Awaitable, ExecutionContext, ExecutionContextExecutor}
 import scala.concurrent.duration.Duration
 import scala.concurrent.forkjoin.{ForkJoinPool => SForkJoinPool, 
ForkJoinWorkerThread => SForkJoinWorkerThread}
 import scala.util.control.NonFatal
@@ -180,39 +180,30 @@ private[spark] object ThreadUtils {
 
   // scalastyle:off awaitresult
   /**
-   * Preferred alternative to `Await.result()`. This method wraps and 
re-throws any exceptions
-   * thrown by the underlying `Await` call, ensuring that this thread's stack 
trace appears in
-   * logs.
-   */
-  @throws(classOf[SparkException])
-  def awaitResult[T](awaitable: Awaitable[T], atMost: Duration): T = {
-try {
-  Await.result(awaitable, atMost)
-  // scalastyle:on awaitresult
-} catch {
-  case 

[GitHub] spark issue #16230: [SPARK-13747][Core]Fix potential ThreadLocal leaks in RP...

2016-12-13 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16230
  
Merging to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16230: [SPARK-13747][Core]Fix potential ThreadLocal leaks in RP...

2016-12-13 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16230
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions

2016-12-13 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 096f868b7 -> d53f18cae


[SPARK-18675][SQL] CTAS for hive serde table should work for all hive versions

## What changes were proposed in this pull request?

Before hive 1.1, when inserting into a table, hive will create the staging 
directory under a common scratch directory. After the writing is finished, hive 
will simply empty the table directory and move the staging directory to it.

After hive 1.1, hive will create the staging directory under the table 
directory, and when moving staging directory to table directory, hive will 
still empty the table directory, but will exclude the staging directory there.

In `InsertIntoHiveTable`, we simply copy the code from hive 1.2, which means we 
will always create the staging directory under the table directory, no matter 
what the hive version is. This causes problems if the hive version is prior to 
1.1, because the staging directory will be removed by hive when hive is trying 
to empty the table directory.

This PR copies the code from hive 0.13, so that we have 2 branches to create 
staging directory. If hive version is prior to 1.1, we'll go to the old style 
branch(i.e. create the staging directory under a common scratch directory), 
else, go to the new style branch(i.e. create the staging directory under the 
table directory)

## How was this patch tested?

new test

Author: Wenchen Fan 

Closes #16104 from cloud-fan/hive-0.13.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d53f18ca
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d53f18ca
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d53f18ca

Branch: refs/heads/master
Commit: d53f18cae41c6c77a0cff3f1fd266e4c1b9ea79a
Parents: 096f868
Author: Wenchen Fan 
Authored: Tue Dec 13 09:46:58 2016 -0800
Committer: Yin Huai 
Committed: Tue Dec 13 09:46:58 2016 -0800

--
 .../hive/execution/InsertIntoHiveTable.scala| 68 +---
 .../spark/sql/hive/client/VersionsSuite.scala   | 19 +-
 2 files changed, 75 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d53f18ca/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index db2239d..82c7b1a 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -22,7 +22,6 @@ import java.net.URI
 import java.text.SimpleDateFormat
 import java.util.{Date, Locale, Random}
 
-import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.common.FileUtils
 import org.apache.hadoop.hive.ql.exec.TaskRunner
@@ -86,6 +85,7 @@ case class InsertIntoHiveTable(
 
   val hadoopConf = sessionState.newHadoopConf()
   val stagingDir = hadoopConf.get("hive.exec.stagingdir", ".hive-staging")
+  val scratchDir = hadoopConf.get("hive.exec.scratchdir", "/tmp/hive")
 
   private def executionId: String = {
 val rand: Random = new Random
@@ -93,7 +93,7 @@ case class InsertIntoHiveTable(
 "hive_" + format.format(new Date) + "_" + Math.abs(rand.nextLong)
   }
 
-  private def getStagingDir(inputPath: Path, hadoopConf: Configuration): Path 
= {
+  private def getStagingDir(inputPath: Path): Path = {
 val inputPathUri: URI = inputPath.toUri
 val inputPathName: String = inputPathUri.getPath
 val fs: FileSystem = inputPath.getFileSystem(hadoopConf)
@@ -121,21 +121,69 @@ case class InsertIntoHiveTable(
 return dir
   }
 
-  private def getExternalScratchDir(extURI: URI, hadoopConf: Configuration): 
Path = {
-getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, 
extURI.getPath), hadoopConf)
+  private def getExternalScratchDir(extURI: URI): Path = {
+getStagingDir(new Path(extURI.getScheme, extURI.getAuthority, 
extURI.getPath))
   }
 
-  def getExternalTmpPath(path: Path, hadoopConf: Configuration): Path = {
+  def getExternalTmpPath(path: Path): Path = {
+import org.apache.spark.sql.hive.client.hive._
+
+val hiveVersion = 
externalCatalog.asInstanceOf[HiveExternalCatalog].client.version
+// Before Hive 1.1, when inserting into a table, Hive will create the 
staging directory under
+// a common scratch directory. After the writing is finished, Hive will 
simply empty the table
+// directory and move the staging directory to it.
+// After Hive 1.1, Hive will 

[GitHub] spark issue #16104: [SPARK-18675][SQL] CTAS for hive serde table should work...

2016-12-13 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16104
  
LGTM. Thanks. Merging to master


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partitioning to avoid more data skew

2016-11-29 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master d57a594b8 -> f8878a4c6


[SPARK-18631][SQL] Changed ExchangeCoordinator re-partitioning to avoid more 
data skew

## What changes were proposed in this pull request?

Re-partitioning logic in ExchangeCoordinator changed so that adding another 
pre-shuffle partition to the post-shuffle partition will not be done if doing 
so would cause the size of the post-shuffle partition to exceed the target 
partition size.

## How was this patch tested?

Existing tests updated to reflect new expectations.

Author: Mark Hamstra 

Closes #16065 from markhamstra/SPARK-17064.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f8878a4c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f8878a4c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f8878a4c

Branch: refs/heads/master
Commit: f8878a4c6f7c4ebb16e4aef26ad0869ba12eb9fc
Parents: d57a594
Author: Mark Hamstra 
Authored: Tue Nov 29 15:01:12 2016 -0800
Committer: Yin Huai 
Committed: Tue Nov 29 15:01:12 2016 -0800

--
 .../exchange/ExchangeCoordinator.scala  | 32 
 .../execution/ExchangeCoordinatorSuite.scala| 40 ++--
 2 files changed, 35 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f8878a4c/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
index 57da85f..deb2c24 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala
@@ -69,15 +69,18 @@ import org.apache.spark.sql.execution.{ShuffledRowRDD, 
SparkPlan}
  * post-shuffle partition. Once we have size statistics of pre-shuffle 
partitions from stages
  * corresponding to the registered [[ShuffleExchange]]s, we will do a pass of 
those statistics and
  * pack pre-shuffle partitions with continuous indices to a single 
post-shuffle partition until
- * the size of a post-shuffle partition is equal or greater than the target 
size.
+ * adding another pre-shuffle partition would cause the size of a post-shuffle 
partition to be
+ * greater than the target size.
+ *
  * For example, we have two stages with the following pre-shuffle partition 
size statistics:
  * stage 1: [100 MB, 20 MB, 100 MB, 10MB, 30 MB]
  * stage 2: [10 MB,  10 MB, 70 MB,  5 MB, 5 MB]
  * assuming the target input size is 128 MB, we will have three post-shuffle 
partitions,
  * which are:
- *  - post-shuffle partition 0: pre-shuffle partition 0 and 1
- *  - post-shuffle partition 1: pre-shuffle partition 2
- *  - post-shuffle partition 2: pre-shuffle partition 3 and 4
+ *  - post-shuffle partition 0: pre-shuffle partition 0 (size 110 MB)
+ *  - post-shuffle partition 1: pre-shuffle partition 1 (size 30 MB)
+ *  - post-shuffle partition 2: pre-shuffle partition 2 (size 170 MB)
+ *  - post-shuffle partition 3: pre-shuffle partition 3 and 4 (size 50 MB)
  */
 class ExchangeCoordinator(
 numExchanges: Int,
@@ -164,25 +167,20 @@ class ExchangeCoordinator(
 while (i < numPreShufflePartitions) {
   // We calculate the total size of ith pre-shuffle partitions from all 
pre-shuffle stages.
   // Then, we add the total size to postShuffleInputSize.
+  var nextShuffleInputSize = 0L
   var j = 0
   while (j < mapOutputStatistics.length) {
-postShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
+nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i)
 j += 1
   }
 
-  // If the current postShuffleInputSize is equal or greater than the
-  // targetPostShuffleInputSize, We need to add a new element in 
partitionStartIndices.
-  if (postShuffleInputSize >= targetPostShuffleInputSize) {
-if (i < numPreShufflePartitions - 1) {
-  // Next start index.
-  partitionStartIndices += i + 1
-} else {
-  // This is the last element. So, we do not need to append the next 
start index to
-  // partitionStartIndices.
-}
+  // If including the nextShuffleInputSize would exceed the target 
partition size, then start a
+  // new partition.
+  if (i > 0 && postShuffleInputSize + nextShuffleInputSize > 
targetPostShuffleInputSize) {
+partitionStartIndices += i
 // reset postShuffleInputSize.
-postShuffleInputSize = 0L
-  }
+

[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...

2016-11-29 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16065
  
Thanks @markhamstra Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16065: [SPARK-18631][SQL] Changed ExchangeCoordinator re-partit...

2016-11-29 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16065
  
lgtm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14638: [SPARK-11374][SQL] Support `skip.header.line.coun...

2016-11-29 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14638#discussion_r90098793
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -122,10 +126,20 @@ class HadoopTableReader(
 val attrsWithIndex = attributes.zipWithIndex
 val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
 
-val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
+val deserializedHadoopRDD = hadoopRDD.mapPartitionsWithIndex { (index, 
iter) =>
   val hconf = broadcastedHadoopConf.value.value
   val deserializer = deserializerClass.newInstance()
   deserializer.initialize(hconf, tableDesc.getProperties)
+  if (skipHeaderLineCount > 0 && isTextInputFormatTable) {
+val partition = 
hadoopRDD.partitions(index).asInstanceOf[HadoopPartition]
--- End diff --

I am +1 on adding the check.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #14638: [SPARK-11374][SQL] Support `skip.header.line.coun...

2016-11-29 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/14638#discussion_r90098551
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala ---
@@ -122,10 +126,20 @@ class HadoopTableReader(
 val attrsWithIndex = attributes.zipWithIndex
 val mutableRow = new SpecificInternalRow(attributes.map(_.dataType))
 
-val deserializedHadoopRDD = hadoopRDD.mapPartitions { iter =>
+val deserializedHadoopRDD = hadoopRDD.mapPartitionsWithIndex { (index, 
iter) =>
   val hconf = broadcastedHadoopConf.value.value
   val deserializer = deserializerClass.newInstance()
   deserializer.initialize(hconf, tableDesc.getProperties)
+  if (skipHeaderLineCount > 0 && isTextInputFormatTable) {
+val partition = 
hadoopRDD.partitions(index).asInstanceOf[HadoopPartition]
+if (partition.inputSplit.t.asInstanceOf[FileSplit].getStart() == 
0) {
--- End diff --

is `partition.inputSplit.t.asInstanceOf[FileSplit].getStart() != 0` tested 
(for a split that does not start from the start of the file, we do not skip)?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15979: [SPARK-18251][SQL] the type of Dataset can't be Option o...

2016-11-28 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15979
  
looks good. @liancheng want to double check?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15979: [SPARK-18251][SQL] the type of Dataset can't be O...

2016-11-28 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15979#discussion_r89936859
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 ---
@@ -47,6 +47,14 @@ object ExpressionEncoder {
 // We convert the not-serializable TypeTag into StructType and 
ClassTag.
 val mirror = typeTag[T].mirror
 val tpe = typeTag[T].tpe
+
+if (ScalaReflection.optionOfNonFlatType(tpe)) {
+  throw new UnsupportedOperationException(
+"Cannot create encoder for Option of non-flat type, as non-flat 
type is represented " +
+  "as a row, and the entire row can not be null in Spark SQL like 
normal databases. " +
+  "You can wrap your type with Tuple1 if you do want top level 
null objects.")
--- End diff --

Let's provide an example in the error message to help users understand how 
to handle this case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino

2016-11-28 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 32b259fae -> 34ad4d520


[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 
to match the version of org.codehaus.janino:janino

## What changes were proposed in this pull request?
org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and 
we have been upgraded to org.codehaus.janino:janino 3.0.0.

However, seems we are still pulling in org.codehaus.janino:commons-compiler 
2.7.6 because of calcite. It looks like an accident because we exclude janino 
from calcite (see here 
https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR 
upgrades org.codehaus.janino:commons-compiler to 3.0.0.

## How was this patch tested?
jenkins

Author: Yin Huai <yh...@databricks.com>

Closes #16025 from yhuai/janino-commons-compile.

(cherry picked from commit eba727757ed5dc23c635e1926795aea62ec0fc66)
Signed-off-by: Yin Huai <yh...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/34ad4d52
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/34ad4d52
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/34ad4d52

Branch: refs/heads/branch-2.1
Commit: 34ad4d520ae0e4302972097c5985ab2c5a8d5e04
Parents: 32b259f
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Nov 28 10:09:30 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Nov 28 10:09:50 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 9 +
 sql/catalyst/pom.xml   | 4 
 7 files changed, 18 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index bbdea06..89bfcef 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index a2dec41..8df3858 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index c1f02b9..71e7fb6 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 4f04636..ba31391 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/34ad4d52/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da3af9f..b129e5a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 co

spark git commit: [SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 to match the version of org.codehaus.janino:janino

2016-11-28 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master 237c3b964 -> eba727757


[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 3.0.0 
to match the version of org.codehaus.janino:janino

## What changes were proposed in this pull request?
org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler and 
we have been upgraded to org.codehaus.janino:janino 3.0.0.

However, seems we are still pulling in org.codehaus.janino:commons-compiler 
2.7.6 because of calcite. It looks like an accident because we exclude janino 
from calcite (see here 
https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR 
upgrades org.codehaus.janino:commons-compiler to 3.0.0.

## How was this patch tested?
jenkins

Author: Yin Huai <yh...@databricks.com>

Closes #16025 from yhuai/janino-commons-compile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eba72775
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eba72775
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eba72775

Branch: refs/heads/master
Commit: eba727757ed5dc23c635e1926795aea62ec0fc66
Parents: 237c3b9
Author: Yin Huai <yh...@databricks.com>
Authored: Mon Nov 28 10:09:30 2016 -0800
Committer: Yin Huai <yh...@databricks.com>
Committed: Mon Nov 28 10:09:30 2016 -0800

--
 dev/deps/spark-deps-hadoop-2.2 | 2 +-
 dev/deps/spark-deps-hadoop-2.3 | 2 +-
 dev/deps/spark-deps-hadoop-2.4 | 2 +-
 dev/deps/spark-deps-hadoop-2.6 | 2 +-
 dev/deps/spark-deps-hadoop-2.7 | 2 +-
 pom.xml| 9 +
 sql/catalyst/pom.xml   | 4 
 7 files changed, 18 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.2
--
diff --git a/dev/deps/spark-deps-hadoop-2.2 b/dev/deps/spark-deps-hadoop-2.2
index bbdea06..89bfcef 100644
--- a/dev/deps/spark-deps-hadoop-2.2
+++ b/dev/deps/spark-deps-hadoop-2.2
@@ -24,7 +24,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.3
--
diff --git a/dev/deps/spark-deps-hadoop-2.3 b/dev/deps/spark-deps-hadoop-2.3
index a2dec41..8df3858 100644
--- a/dev/deps/spark-deps-hadoop-2.3
+++ b/dev/deps/spark-deps-hadoop-2.3
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.4
--
diff --git a/dev/deps/spark-deps-hadoop-2.4 b/dev/deps/spark-deps-hadoop-2.4
index c1f02b9..71e7fb6 100644
--- a/dev/deps/spark-deps-hadoop-2.4
+++ b/dev/deps/spark-deps-hadoop-2.4
@@ -27,7 +27,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.6
--
diff --git a/dev/deps/spark-deps-hadoop-2.6 b/dev/deps/spark-deps-hadoop-2.6
index 4f04636..ba31391 100644
--- a/dev/deps/spark-deps-hadoop-2.6
+++ b/dev/deps/spark-deps-hadoop-2.6
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/blob/eba72775/dev/deps/spark-deps-hadoop-2.7
--
diff --git a/dev/deps/spark-deps-hadoop-2.7 b/dev/deps/spark-deps-hadoop-2.7
index da3af9f..b129e5a 100644
--- a/dev/deps/spark-deps-hadoop-2.7
+++ b/dev/deps/spark-deps-hadoop-2.7
@@ -31,7 +31,7 @@ commons-beanutils-core-1.8.0.jar
 commons-cli-1.2.jar
 commons-codec-1.10.jar
 commons-collections-3.2.2.jar
-commons-compiler-2.7.6.jar
+commons-compiler-3.0.0.jar
 commons-compress-1.4.1.jar
 commons-configuration-1.6.jar
 commons-crypto-1.0.0.jar

http://git-wip-us.apache.org/repos/asf/spark/

[GitHub] spark issue #16025: [SPARK-18602] Set the version of org.codehaus.janino:com...

2016-11-28 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16025
  
Thanks. Since we start to use janino 3.0.0 in spark 2.1, I am merging this 
pr to both master and branch 2.1. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16025: [SPARK-18602] Set the version of org.codehaus.jan...

2016-11-27 Thread yhuai
GitHub user yhuai opened a pull request:

https://github.com/apache/spark/pull/16025

[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 
3.0.0 to match the version of org.codehaus.janino:janino

## What changes were proposed in this pull request?
org.codehaus.janino:janino depends on org.codehaus.janino:commons-compiler 
and we have been upgraded to org.codehaus.janino:janino 3.0.0.

However, seems we are still pulling in org.codehaus.janino:commons-compiler 
2.7.6 because of calcite. It looks like an accident because we exclude janino 
from calcite (see here 
https://github.com/apache/spark/blob/branch-2.1/pom.xml#L1759). So, this PR 
upgrades org.codehaus.janino:commons-compiler to 3.0.0.

## How was this patch tested?
jenkins


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yhuai/spark janino-commons-compile

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16025.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16025


commit c5685f0bea849f0b7365c8fb161b8a1f0958383b
Author: Yin Huai <yh...@databricks.com>
Date:   2016-11-27T23:43:25Z

[SPARK-18602] Set the version of org.codehaus.janino:commons-compiler to 
3.0.0 to match the version of org.codehaus.janino:janino




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #16025: [SPARK-18602] Set the version of org.codehaus.janino:com...

2016-11-27 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/16025
  
@kiszk want to take a look?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15062: SPARK-17424: Fix unsound substitution bug in ScalaReflec...

2016-11-21 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15062
  
any chance that is the same issue as 
https://issues.apache.org/jira/browse/SPARK-17109?
@rdblue When you were debugging this issue, which version of scala did you 
use? Scala 2.10 or Scala 2.11? If you were using scala 2.10, is it possible to 
try scala 2.11? Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database

2016-11-17 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 978798880 -> fc466be4f


[SPARK-18360][SQL] default table path of tables in default database should 
depend on the location of default database

## What changes were proposed in this pull request?

The current semantic of the warehouse config:

1. it's a static config, which means you can't change it once your spark 
application is launched.
2. Once a database is created, its location won't change even the warehouse 
path config is changed.
3. default database is a special case, although its location is fixed, but the 
locations of tables created in it are not. If a Spark app starts with warehouse 
path B(while the location of default database is A), then users create a table 
`tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If 
uses change the warehouse path config to C, and create another table `tbl2`, 
its location will still be `B/tbl2` instead of `C/tbl2`.

rule 3 doesn't make sense and I think we made it by mistake, not intentionally. 
Data source tables don't follow rule 3 and treat default database like normal 
ones.

This PR fixes hive serde tables to make it consistent with data source tables.

## How was this patch tested?

HiveSparkSubmitSuite

Author: Wenchen Fan 

Closes #15812 from cloud-fan/default-db.

(cherry picked from commit ce13c2672318242748f7520ed4ce6bcfad4fb428)
Signed-off-by: Yin Huai 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fc466be4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fc466be4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fc466be4

Branch: refs/heads/branch-2.1
Commit: fc466be4fd8def06880f59d50e5567c22cc53d6a
Parents: 9787988
Author: Wenchen Fan 
Authored: Thu Nov 17 17:31:12 2016 -0800
Committer: Yin Huai 
Committed: Thu Nov 17 17:31:43 2016 -0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 237 ++-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  76 +-
 2 files changed, 190 insertions(+), 123 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fc466be4/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8433058..cacffcf 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 
 if (tableDefinition.tableType == VIEW) {
   client.createTable(tableDefinition, ignoreIfExists)
-} else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
-  // Here we follow data source tables and put table metadata like 
provider, schema, etc. in
-  // table properties, so that we can work around the Hive metastore issue 
about not case
-  // preserving and make Hive serde table support mixed-case column names.
-  val tableWithDataSourceProps = tableDefinition.copy(
-properties = tableDefinition.properties ++ 
tableMetaToTableProps(tableDefinition))
-  client.createTable(tableWithDataSourceProps, ignoreIfExists)
 } else {
-  // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
-  // support, no column nullability, etc., we should do some extra works 
before saving table
-  // metadata into Hive metastore:
-  //  1. Put table metadata like provider, schema, etc. in table 
properties.
-  //  2. Check if this table is hive compatible.
-  //2.1  If it's not hive compatible, set location URI, schema, 
partition columns and bucket
-  // spec to empty and save table metadata to Hive.
-  //2.2  If it's hive compatible, set serde information in table 
metadata and try to save
-  // it to Hive. If it fails, treat it as not hive compatible and 
go back to 2.1
-  val tableProperties = tableMetaToTableProps(tableDefinition)
-
   // Ideally we should not create a managed table with location, but Hive 
serde table can
   // specify location for managed table. And in 
[[CreateDataSourceTableAsSelectCommand]] we have
   // to create the table directory and write out data before we create 
this table, to avoid
   // exposing a partial written table.
   val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
 tableDefinition.storage.locationUri.isEmpty
+
   val 

spark git commit: [SPARK-18360][SQL] default table path of tables in default database should depend on the location of default database

2016-11-17 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master b0aa1aa1a -> ce13c2672


[SPARK-18360][SQL] default table path of tables in default database should 
depend on the location of default database

## What changes were proposed in this pull request?

The current semantic of the warehouse config:

1. it's a static config, which means you can't change it once your spark 
application is launched.
2. Once a database is created, its location won't change even the warehouse 
path config is changed.
3. default database is a special case, although its location is fixed, but the 
locations of tables created in it are not. If a Spark app starts with warehouse 
path B(while the location of default database is A), then users create a table 
`tbl` in default database, its location will be `B/tbl` instead of `A/tbl`. If 
uses change the warehouse path config to C, and create another table `tbl2`, 
its location will still be `B/tbl2` instead of `C/tbl2`.

rule 3 doesn't make sense and I think we made it by mistake, not intentionally. 
Data source tables don't follow rule 3 and treat default database like normal 
ones.

This PR fixes hive serde tables to make it consistent with data source tables.

## How was this patch tested?

HiveSparkSubmitSuite

Author: Wenchen Fan 

Closes #15812 from cloud-fan/default-db.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ce13c267
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ce13c267
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ce13c267

Branch: refs/heads/master
Commit: ce13c2672318242748f7520ed4ce6bcfad4fb428
Parents: b0aa1aa
Author: Wenchen Fan 
Authored: Thu Nov 17 17:31:12 2016 -0800
Committer: Yin Huai 
Committed: Thu Nov 17 17:31:12 2016 -0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 237 ++-
 .../spark/sql/hive/HiveSparkSubmitSuite.scala   |  76 +-
 2 files changed, 190 insertions(+), 123 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ce13c267/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 8433058..cacffcf 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -197,136 +197,151 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
 
 if (tableDefinition.tableType == VIEW) {
   client.createTable(tableDefinition, ignoreIfExists)
-} else if (tableDefinition.provider.get == DDLUtils.HIVE_PROVIDER) {
-  // Here we follow data source tables and put table metadata like 
provider, schema, etc. in
-  // table properties, so that we can work around the Hive metastore issue 
about not case
-  // preserving and make Hive serde table support mixed-case column names.
-  val tableWithDataSourceProps = tableDefinition.copy(
-properties = tableDefinition.properties ++ 
tableMetaToTableProps(tableDefinition))
-  client.createTable(tableWithDataSourceProps, ignoreIfExists)
 } else {
-  // To work around some hive metastore issues, e.g. not case-preserving, 
bad decimal type
-  // support, no column nullability, etc., we should do some extra works 
before saving table
-  // metadata into Hive metastore:
-  //  1. Put table metadata like provider, schema, etc. in table 
properties.
-  //  2. Check if this table is hive compatible.
-  //2.1  If it's not hive compatible, set location URI, schema, 
partition columns and bucket
-  // spec to empty and save table metadata to Hive.
-  //2.2  If it's hive compatible, set serde information in table 
metadata and try to save
-  // it to Hive. If it fails, treat it as not hive compatible and 
go back to 2.1
-  val tableProperties = tableMetaToTableProps(tableDefinition)
-
   // Ideally we should not create a managed table with location, but Hive 
serde table can
   // specify location for managed table. And in 
[[CreateDataSourceTableAsSelectCommand]] we have
   // to create the table directory and write out data before we create 
this table, to avoid
   // exposing a partial written table.
   val needDefaultTableLocation = tableDefinition.tableType == MANAGED &&
 tableDefinition.storage.locationUri.isEmpty
+
   val tableLocation = if (needDefaultTableLocation) {
 Some(defaultTablePath(tableDefinition.identifier))
   } else {
 

[GitHub] spark issue #15812: [SPARK-18360][SQL] default table path of tables in defau...

2016-11-17 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15812
  
LGTM. Merging to master and branch 2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15922: [SPARK-18462] Fix ClassCastException in SparkListenerDri...

2016-11-17 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15922
  
lgtm pending jenkins


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativ...

2016-11-16 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15703
  
LGTM. Merging to master.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



spark git commit: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial aggregation support

2016-11-16 Thread yhuai
Repository: spark
Updated Branches:
  refs/heads/master a36a76ac4 -> 2ca8ae9aa


[SPARK-18186] Migrate HiveUDAFFunction to TypedImperativeAggregate for partial 
aggregation support

## What changes were proposed in this pull request?

While being evaluated in Spark SQL, Hive UDAFs don't support partial 
aggregation. This PR migrates `HiveUDAFFunction`s to 
`TypedImperativeAggregate`, which already provides partial aggregation support 
for aggregate functions that may use arbitrary Java objects as aggregation 
states.

The following snippet shows the effect of this PR:

```scala
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax
sql(s"CREATE FUNCTION hive_max AS '${classOf[GenericUDAFMax].getName}'")

spark.range(100).createOrReplaceTempView("t")

// A query using both Spark SQL native `max` and Hive `max`
sql(s"SELECT max(id), hive_max(id) FROM t").explain()
```

Before this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, 
HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax7475f57e),
 id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- *Range (0, 100, step=1, splits=Some(1))
```

After this PR:

```
== Physical Plan ==
SortAggregate(key=[], functions=[max(id#1L), default.hive_max(default.hive_max, 
HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7),
 id#1L, false, 0, 0)])
+- Exchange SinglePartition
   +- SortAggregate(key=[], functions=[partial_max(id#1L), 
partial_default.hive_max(default.hive_max, 
HiveFunctionWrapper(org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax,org.apache.hadoop.hive.ql.udf.generic.GenericUDAFMax5e18a6a7),
 id#1L, false, 0, 0)])
  +- *Range (0, 100, step=1, splits=Some(1))
```

The tricky part of the PR is mostly about updating and passing around 
aggregation states of `HiveUDAFFunction`s since the aggregation state of a Hive 
UDAF may appear in three different forms. Let's take a look at the testing 
`MockUDAF` added in this PR as an example. This UDAF computes the count of 
non-null values together with the count of nulls of a given column. Its 
aggregation state may appear as the following forms at different time:

1. A `MockUDAFBuffer`, which is a concrete subclass of 
`GenericUDAFEvaluator.AggregationBuffer`

   The form used by Hive UDAF API. This form is required by the following 
scenarios:

   - Calling `GenericUDAFEvaluator.iterate()` to update an existing aggregation 
state with new input values.
   - Calling `GenericUDAFEvaluator.terminate()` to get the final aggregated 
value from an existing aggregation state.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states 
into an existing aggregation state.

 The existing aggregation state to be updated must be in this form.

   Conversions:

   - To form 2:

 `GenericUDAFEvaluator.terminatePartial()`

   - To form 3:

 Convert to form 2 first, and then to 3.

2. An `Object[]` array containing two `java.lang.Long` values.

   The form used to interact with Hive's `ObjectInspector`s. This form is 
required by the following scenarios:

   - Calling `GenericUDAFEvaluator.terminatePartial()` to convert an existing 
aggregation state in form 1 to form 2.
   - Calling `GenericUDAFEvaluator.merge()` to merge other aggregation states 
into an existing aggregation state.

 The input aggregation state must be in this form.

   Conversions:

   - To form 1:

 No direct method. Have to create an empty `AggregationBuffer` and merge it 
into the empty buffer.

   - To form 3:

 `unwrapperFor()`/`unwrap()` method of `HiveInspectors`

3. The byte array that holds data of an `UnsafeRow` with two `LongType` fields.

   The form used by Spark SQL to shuffle partial aggregation results. This form 
is required because `TypedImperativeAggregate` always asks its subclasses to 
serialize their aggregation states into a byte array.

   Conversions:

   - To form 1:

 Convert to form 2 first, and then to 1.

   - To form 2:

 `wrapperFor()`/`wrap()` method of `HiveInspectors`

Here're some micro-benchmark results produced by the most recent master and 
this PR branch.

Master:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:   Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

w/o groupBy339 /  372  3.1 
323.2   1.0X
w/ groupBy 503 /  529  2.1 
479.7   0.7X
```

This PR:

```
Java HotSpot(TM) 64-Bit Server VM 1.8.0_92-b14 on Mac OS X 10.10.5
Intel(R) Core(TM) i7-4960HQ CPU  2.60GHz

hive udaf vs spark af:   Best/Avg Time(ms)

[GitHub] spark issue #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedImperativ...

2016-11-16 Thread yhuai
Github user yhuai commented on the issue:

https://github.com/apache/spark/pull/15703
  
Code changes looks good to me. Let's also do a benchmark to sanity check 
our implementation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88326725
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

ok. I think in general we should avoid of using this pattern. If we have to 
use it now, let's explain it in the comment.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...

2016-11-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15900#discussion_r88289527
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 ---
@@ -1371,4 +1371,23 @@ class MetastoreDataSourcesSuite extends QueryTest 
with SQLTestUtils with TestHiv
   }
 }
   }
+
+  test("SPARK-18464: support old table which doesn't store schema in table 
properties") {
+withTable("old") {
+  withTempPath { path =>
+Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+val tableDesc = CatalogTable(
+  identifier = TableIdentifier("old", Some("default")),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(
+properties = Map("path" -> path.getAbsolutePath)
+  ),
+  schema = new StructType(),
+  properties = Map(
+HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+hiveClient.createTable(tableDesc, ignoreIfExists = false)
+checkAnswer(spark.table("old"), Row(1, "a"))
--- End diff --

Can we also test `describe table` and make sure it can provide correct 
column info?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...

2016-11-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15900#discussion_r88289294
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala ---
@@ -1023,6 +1023,11 @@ object HiveExternalCatalog {
   // After SPARK-6024, we removed this flag.
   // Although we are not using `spark.sql.sources.schema` any more, we 
need to still support.
   DataType.fromJson(schema.get).asInstanceOf[StructType]
+} else if 
(props.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) {
+  // If there is no schema information in table properties, it means 
the schema of this table
+  // was empty when saving into metastore, which is possible in older 
version of Spark. We
+  // should respect it.
+  new StructType()
--- End diff --

btw, this function is only needed for data source tables, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15900: [SPARK-18464][SQL] support old table which doesn'...

2016-11-16 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15900#discussion_r88289046
  
--- Diff: 
sql/hive/src/test/scala/org/apache/spark/sql/hive/MetastoreDataSourcesSuite.scala
 ---
@@ -1371,4 +1371,23 @@ class MetastoreDataSourcesSuite extends QueryTest 
with SQLTestUtils with TestHiv
   }
 }
   }
+
+  test("SPARK-18464: support old table which doesn't store schema in table 
properties") {
+withTable("old") {
+  withTempPath { path =>
+Seq(1 -> "a").toDF("i", "j").write.parquet(path.getAbsolutePath)
+val tableDesc = CatalogTable(
+  identifier = TableIdentifier("old", Some("default")),
+  tableType = CatalogTableType.EXTERNAL,
+  storage = CatalogStorageFormat.empty.copy(
+properties = Map("path" -> path.getAbsolutePath)
+  ),
+  schema = new StructType(),
+  properties = Map(
+HiveExternalCatalog.DATASOURCE_PROVIDER -> "parquet"))
+hiveClient.createTable(tableDesc, ignoreIfExists = false)
+checkAnswer(spark.table("old"), Row(1, "a"))
+  }
+}
+  }
--- End diff --

It will be good to actually create a set of compatibility tests to make 
sure a new version of Spark can access table metadata created by a older 
version (starting from Spark 1.3) without problem. Let's create a follow-up 
jira for this task and do it during the QA period of spark 2.1.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88172060
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
+  }
+
+  override def eval(buffer: AggregationBuffer): Any = {
+resultUnwrapper(finalModeEvaluator.terminate(buffer))
+  }
+
+  override def serialize(buffer: AggregationBuffer): Array[Byte] = {
+aggBufferSerDe.serialize(buffer)
+  }
+
+  override def deserialize(bytes: Array[Byte]): AggregationBuffer = {
+aggBufferSerDe.deserialize(bytes)
+  }
+
+  // Helper class used to de/serialize Hive UDAF `AggregationBuffer` 
objects
+  private class AggregationBufferSerDe {
--- End diff --

Can we take this class out from HiveUDAFFunction?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171437
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
   @transient
-  private lazy val wrappers = children.map(x => wrapperFor(toInspector(x), 
x.dataType)).toArray
+  private val partialResultInspector = partial1ModeEvaluator.init(
+GenericUDAFEvaluator.Mode.PARTIAL1,
+inputInspectors
+  )
 
+  // The UDAF evaluator used to merge partial aggregation results.
   @transient
-  private lazy val returnInspector = functionAndInspector._2
+  private lazy val partial2ModeEvaluator = {
+val evaluator = newEvaluator()
+evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, 
Array(partialResultInspector))
+evaluator
+  }
 
+  // Spark SQL data type of partial aggregation results
   @transient
-  private lazy val unwrapper = unwrapperFor(returnInspector)
+  private lazy val partialResultDataType = 
inspectorToDataType(partialResultInspector)
 
+  // The UDAF evaluator used to compute the final result from a partial 
aggregation result objects.
   @transient
-  private[this] var buffer: GenericUDAFEvaluator.AggregationBuffer = _
-
-  override def eval(input: InternalRow): Any = 
unwrapper(function.evaluate(buffer))
+  private lazy val finalModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect the final aggregation result 
object.
   @transient
-  private lazy val inputProjection = new InterpretedProjection(children)
+  private val returnInspector = finalModeEvaluator.init(
+GenericUDAFEvaluator.Mode.FINAL,
+Array(partialResultInspector)
+  )
 
+  // Wrapper functions used to wrap Spark SQL input arguments into Hive 
specific format.
   @transient
-  private lazy val cached = new Array[AnyRef](children.length)
+  private lazy val inputWrappers = children.map(x => 
wrapperFor(toInspector(x), x.dataType)).toArray
 
+  // Unwrapper function used to unwrap final aggregation result objects 
returned by Hive UDAFs into
+  // Spark SQL specific format.
   @transient
-  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
-
-  // Hive UDAF has its own buffer, so we don't need to occupy a slot in 
the aggregation
-  // buffer for it.
-  override def aggBufferSchema: StructType = StructType(Nil)
-
-  override def update(_buffer: InternalRow, input: InternalRow): Unit = {
-val inputs = inputProjection(input)
-function.iterate(buffer, wrap(inputs, wrappers, cached, 
inputDataTypes))
-  }
-
-  override def merge(buffer1: InternalRow, buffer2: InternalRow): Unit = {
-throw new UnsupportedOperationException(
-  "Hive UDAF doesn't support partial aggregate")
-  }
+  private lazy val resultUnwrapper = unwrapperFor(returnInspector)
 
-  override def initialize(_buffer: InternalRow): Unit = {
-buffer = function.getNewAggregationBuffer
-  }
-
-  override val aggBufferAttributes: Seq[AttributeReference] = Nil
+  @transient
+  private lazy val cached: Array[AnyRef] = new 
Array[AnyRef](children.length)
 
-  // Note: although this simply copies aggBufferAttributes, this common 
code can not be placed
-  // in the superclass because that will lead to initialization ordering 
issues.
-  override val inputAggBu

[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171373
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
 
+  // Hive `ObjectInspector` used to inspect partial aggregation results.
--- End diff --

Partial aggregation result is aggregation buffer, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171285
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
--- End diff --

Let's add docs to explain when these internal vals are used (like which 
vals are needed for a given mode).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88171097
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -289,73 +302,75 @@ private[hive] case class HiveUDAFFunction(
   funcWrapper.createFunction[AbstractGenericUDAFResolver]()
 }
 
+  // Hive `ObjectInspector`s for all child expressions (input parameters 
of the function).
   @transient
-  private lazy val inspectors = children.map(toInspector).toArray
+  private lazy val inputInspectors = children.map(toInspector).toArray
 
+  // Spark SQL data types of input parameters.
   @transient
-  private lazy val functionAndInspector = {
-val parameterInfo = new SimpleGenericUDAFParameterInfo(inspectors, 
false, false)
-val f = resolver.getEvaluator(parameterInfo)
-f -> f.init(GenericUDAFEvaluator.Mode.COMPLETE, inspectors)
+  private lazy val inputDataTypes: Array[DataType] = 
children.map(_.dataType).toArray
+
+  private def newEvaluator(): GenericUDAFEvaluator = {
+val parameterInfo = new 
SimpleGenericUDAFParameterInfo(inputInspectors, false, false)
+resolver.getEvaluator(parameterInfo)
   }
 
+  // The UDAF evaluator used to consume raw input rows and produce partial 
aggregation results.
   @transient
-  private lazy val function = functionAndInspector._1
+  private lazy val partial1ModeEvaluator = newEvaluator()
--- End diff --

Should we always call init to make the code consistent?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88170694
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

(is the doc below enough?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88170550
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

(we can just put the pr description to here)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88168751
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -263,8 +265,19 @@ private[hive] case class HiveGenericUDTF(
 }
 
 /**
- * Currently we don't support partial aggregation for queries using Hive 
UDAF, which may hurt
- * performance a lot.
+ * While being evaluated by Spark SQL, the aggregation state of a Hive 
UDAF may be in the following
+ * three formats:
+ *
+ *  1. a Spark SQL value, or
+ *  2. an instance of some concrete 
`GenericUDAFEvaluator.AggregationBuffer` class, or
+ *  3. a Java object that can be inspected using the `ObjectInspector` 
returned by the
+ * `GenericUDAFEvaluator.init()` method.
--- End diff --

Besides of explaining what are these three formats, let's also explain when 
we will use each of them.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140760
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
--- End diff --

Why `InterpretedProjection`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #15703: [SPARK-18186] Migrate HiveUDAFFunction to TypedIm...

2016-11-15 Thread yhuai
Github user yhuai commented on a diff in the pull request:

https://github.com/apache/spark/pull/15703#discussion_r88140970
  
--- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUDFs.scala 
---
@@ -365,4 +380,66 @@ private[hive] case class HiveUDAFFunction(
 val distinct = if (isDistinct) "DISTINCT " else " "
 s"$name($distinct${children.map(_.sql).mkString(", ")})"
   }
+
+  override def createAggregationBuffer(): AggregationBuffer =
+partial1ModeEvaluator.getNewAggregationBuffer
+
+  @transient
+  private lazy val inputProjection = new InterpretedProjection(children)
+
+  override def update(buffer: AggregationBuffer, input: InternalRow): Unit 
= {
+partial1ModeEvaluator.iterate(
+  buffer, wrap(inputProjection(input), inputWrappers, cached, 
inputDataTypes))
+  }
+
+  override def merge(buffer: AggregationBuffer, input: AggregationBuffer): 
Unit = {
+partial2ModeEvaluator.merge(buffer, 
partial1ModeEvaluator.terminatePartial(input))
--- End diff --

Let's explain what we are trying to do using 
`partial1ModeEvaluator.terminatePartial(input)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >