spark git commit: [SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class Splitting

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 205142817 -> b32b2123d


[SPARK-18016][SQL][CATALYST] Code Generation: Constant Pool Limit - Class 
Splitting

## What changes were proposed in this pull request?

This pull-request exclusively includes the class splitting feature described in 
#16648. When code for a given class would grow beyond 1600k bytes, a private, 
nested sub-class is generated into which subsequent functions are inlined. 
Additional sub-classes are generated as the code threshold is met subsequent 
times. This code includes 3 changes:

1. Includes helper maps, lists, and functions for keeping track of sub-classes 
during code generation (included in the `CodeGenerator` class). These helper 
functions allow nested classes and split functions to be 
initialized/declared/inlined to the appropriate locations in the various 
projection classes.
2. Changes `addNewFunction` to return a string to support instances where a 
split function is inlined to a nested class and not the outer class (and so 
must be invoked using the class-qualified name). Uses of `addNewFunction` 
throughout the codebase are modified so that the returned name is properly used.
3. Removes instances of the `this` keyword when used on data inside generated 
classes. All state declared in the outer class is by default global and 
accessible to the nested classes. However, if a reference to global state in a 
nested class is prepended with the `this` keyword, it would attempt to 
reference state belonging to the nested class (which would not exist), rather 
than the correct variable belonging to the outer class.

## How was this patch tested?

Added a test case to the `GeneratedProjectionSuite` that increases the number 
of columns tested in various projections to a threshold that would previously 
have triggered a `JaninoRuntimeException` for the Constant Pool.

Note: This PR does not address the second Constant Pool issue with code 
generation (also mentioned in #16648): excess global mutable state. A second PR 
may be opened to resolve that issue.

Author: ALeksander Eskilson 

Closes #18075 from bdrillard/class_splitting_only.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b32b2123
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b32b2123
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b32b2123

Branch: refs/heads/master
Commit: b32b2123ddca66e00acf4c9d956232e07f779f9f
Parents: 2051428
Author: ALeksander Eskilson 
Authored: Thu Jun 15 13:45:08 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 15 13:45:08 2017 +0800

--
 sql/catalyst/pom.xml|   7 +
 .../sql/catalyst/expressions/ScalaUDF.scala |   6 +-
 .../expressions/codegen/CodeGenerator.scala | 140 ---
 .../codegen/GenerateMutableProjection.scala |  17 ++-
 .../expressions/codegen/GenerateOrdering.scala  |   3 +
 .../expressions/codegen/GeneratePredicate.scala |   3 +
 .../codegen/GenerateSafeProjection.scala|   9 +-
 .../codegen/GenerateUnsafeProjection.scala  |   9 +-
 .../expressions/complexTypeCreator.scala|   6 +-
 .../expressions/conditionalExpressions.scala|   4 +-
 .../sql/catalyst/expressions/generators.scala   |   6 +-
 .../catalyst/expressions/objects/objects.scala  |   2 +-
 .../codegen/GeneratedProjectionSuite.scala  |  72 --
 sql/core/pom.xml|   7 +
 .../spark/sql/execution/ColumnarBatchScan.scala |   6 +-
 .../apache/spark/sql/execution/SortExec.scala   |   4 +-
 .../sql/execution/WholeStageCodegenExec.scala   |   3 +
 .../execution/aggregate/HashAggregateExec.scala |   8 +-
 .../sql/execution/basicPhysicalOperators.scala  |  11 +-
 .../columnar/GenerateColumnAccessor.scala   |  13 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../org/apache/spark/sql/execution/limit.scala  |   2 +-
 22 files changed, 259 insertions(+), 81 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/pom.xml
--
diff --git a/sql/catalyst/pom.xml b/sql/catalyst/pom.xml
index 8d80f8e..36948ba 100644
--- a/sql/catalyst/pom.xml
+++ b/sql/catalyst/pom.xml
@@ -132,6 +132,13 @@
 
   
   
+org.scalatest
+scalatest-maven-plugin
+
+  -Xmx4g -Xss4096k -XX:MaxPermSize=${MaxPermGen} 
-XX:ReservedCodeCacheSize=512m
+
+  
+  
 org.antlr
 antlr4-maven-plugin
 

http://git-wip-us.apache.org/repos/asf/spark/blob/b32b2123/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ScalaUDF.scala

spark git commit: [SPARK-20980][SQL] Rename `wholeFile` to `multiLine` for both CSV and JSON

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 e02e0637f -> af4f89c98


[SPARK-20980][SQL] Rename `wholeFile` to `multiLine` for both CSV and JSON

The current option name `wholeFile` is misleading for CSV users. Currently, it 
is not representing a record per file. Actually, one file could have multiple 
records. Thus, we should rename it. Now, the proposal is `multiLine`.

N/A

Author: Xiao Li 

Closes #18202 from gatorsmile/renameCVSOption.

(cherry picked from commit 2051428173d8cd548702eb1a2e1c1ca76b8f2fd5)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/af4f89c9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/af4f89c9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/af4f89c9

Branch: refs/heads/branch-2.2
Commit: af4f89c9815ddbd84ed9f3917765d26efd171483
Parents: e02e063
Author: Xiao Li 
Authored: Thu Jun 15 13:18:19 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 15 13:20:36 2017 +0800

--
 R/pkg/R/SQLContext.R|  6 ++---
 python/pyspark/sql/readwriter.py| 14 ++--
 python/pyspark/sql/streaming.py | 14 ++--
 python/pyspark/sql/tests.py |  8 +++
 .../spark/sql/catalyst/json/JSONOptions.scala   |  2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  6 ++---
 .../datasources/csv/CSVDataSource.scala |  6 ++---
 .../execution/datasources/csv/CSVOptions.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala   |  6 ++---
 .../spark/sql/streaming/DataStreamReader.scala  |  6 ++---
 .../execution/datasources/csv/CSVSuite.scala| 24 ++--
 .../execution/datasources/json/JsonSuite.scala  | 14 ++--
 12 files changed, 54 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/af4f89c9/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index f5c3a74..e3528bc 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -334,7 +334,7 @@ setMethod("toDF", signature(x = "RDD"),
 #'
 #' Loads a JSON file, returning the result as a SparkDataFrame
 #' By default, (\href{http://jsonlines.org/}{JSON Lines text format or 
newline-delimited JSON}
-#' ) is supported. For JSON (one record per file), set a named property 
\code{wholeFile} to
+#' ) is supported. For JSON (one record per file), set a named property 
\code{multiLine} to
 #' \code{TRUE}.
 #' It goes through the entire dataset once to determine the schema.
 #'
@@ -348,7 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
-#' df <- read.json(path, wholeFile = TRUE)
+#' df <- read.json(path, multiLine = TRUE)
 #' df <- jsonFile(path)
 #' }
 #' @name read.json
@@ -598,7 +598,7 @@ tableToDF <- function(tableName) {
 #' df1 <- read.df("path/to/file.json", source = "json")
 #' schema <- structType(structField("name", "string"),
 #'  structField("info", "map"))
-#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
+#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
 #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
 #' }
 #' @name read.df

http://git-wip-us.apache.org/repos/asf/spark/blob/af4f89c9/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 960fb88..4caad8f 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -169,12 +169,12 @@ class DataFrameReader(OptionUtils):
  allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
  allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
  mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
- wholeFile=None):
+ multiLine=None):
 """
 Loads JSON files and returns the results as a :class:`DataFrame`.
 
 `JSON Lines `_ (newline-delimited JSON) is 
supported by default.
-For JSON (one record per file), set the ``wholeFile`` parameter to 
``true``.
+For JSON (one record per file), set the ``multiLine`` parameter to 
``true``.
 
 If the ``schema`` parameter is not specified, this function goes
 through the input once to determine the input schema.
@@ -224,7 +224,7 @@ class DataFrameReader(OptionUtils):
 formats follow the formats at 

spark git commit: [SPARK-20980][SQL] Rename `wholeFile` to `multiLine` for both CSV and JSON

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master fffeb6d7c -> 205142817


[SPARK-20980][SQL] Rename `wholeFile` to `multiLine` for both CSV and JSON

### What changes were proposed in this pull request?
The current option name `wholeFile` is misleading for CSV users. Currently, it 
is not representing a record per file. Actually, one file could have multiple 
records. Thus, we should rename it. Now, the proposal is `multiLine`.

### How was this patch tested?
N/A

Author: Xiao Li 

Closes #18202 from gatorsmile/renameCVSOption.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20514281
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20514281
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20514281

Branch: refs/heads/master
Commit: 2051428173d8cd548702eb1a2e1c1ca76b8f2fd5
Parents: fffeb6d
Author: Xiao Li 
Authored: Thu Jun 15 13:18:19 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 15 13:18:19 2017 +0800

--
 R/pkg/R/SQLContext.R|  6 ++---
 python/pyspark/sql/readwriter.py| 14 ++--
 python/pyspark/sql/streaming.py | 14 ++--
 python/pyspark/sql/tests.py |  8 +++
 .../spark/sql/catalyst/json/JSONOptions.scala   |  2 +-
 .../org/apache/spark/sql/DataFrameReader.scala  |  6 ++---
 .../datasources/csv/CSVDataSource.scala |  6 ++---
 .../execution/datasources/csv/CSVOptions.scala  |  2 +-
 .../datasources/json/JsonDataSource.scala   |  6 ++---
 .../spark/sql/streaming/DataStreamReader.scala  |  6 ++---
 .../execution/datasources/csv/CSVSuite.scala| 24 ++--
 .../execution/datasources/json/JsonSuite.scala  | 14 ++--
 12 files changed, 54 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/20514281/R/pkg/R/SQLContext.R
--
diff --git a/R/pkg/R/SQLContext.R b/R/pkg/R/SQLContext.R
index f5c3a74..e3528bc 100644
--- a/R/pkg/R/SQLContext.R
+++ b/R/pkg/R/SQLContext.R
@@ -334,7 +334,7 @@ setMethod("toDF", signature(x = "RDD"),
 #'
 #' Loads a JSON file, returning the result as a SparkDataFrame
 #' By default, (\href{http://jsonlines.org/}{JSON Lines text format or 
newline-delimited JSON}
-#' ) is supported. For JSON (one record per file), set a named property 
\code{wholeFile} to
+#' ) is supported. For JSON (one record per file), set a named property 
\code{multiLine} to
 #' \code{TRUE}.
 #' It goes through the entire dataset once to determine the schema.
 #'
@@ -348,7 +348,7 @@ setMethod("toDF", signature(x = "RDD"),
 #' sparkR.session()
 #' path <- "path/to/file.json"
 #' df <- read.json(path)
-#' df <- read.json(path, wholeFile = TRUE)
+#' df <- read.json(path, multiLine = TRUE)
 #' df <- jsonFile(path)
 #' }
 #' @name read.json
@@ -598,7 +598,7 @@ tableToDF <- function(tableName) {
 #' df1 <- read.df("path/to/file.json", source = "json")
 #' schema <- structType(structField("name", "string"),
 #'  structField("info", "map"))
-#' df2 <- read.df(mapTypeJsonPath, "json", schema, wholeFile = TRUE)
+#' df2 <- read.df(mapTypeJsonPath, "json", schema, multiLine = TRUE)
 #' df3 <- loadDF("data/test_table", "parquet", mergeSchema = "true")
 #' }
 #' @name read.df

http://git-wip-us.apache.org/repos/asf/spark/blob/20514281/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index 5cf719b..aef71f9 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -174,12 +174,12 @@ class DataFrameReader(OptionUtils):
  allowComments=None, allowUnquotedFieldNames=None, 
allowSingleQuotes=None,
  allowNumericLeadingZero=None, 
allowBackslashEscapingAnyCharacter=None,
  mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None,
- wholeFile=None):
+ multiLine=None):
 """
 Loads JSON files and returns the results as a :class:`DataFrame`.
 
 `JSON Lines `_ (newline-delimited JSON) is 
supported by default.
-For JSON (one record per file), set the ``wholeFile`` parameter to 
``true``.
+For JSON (one record per file), set the ``multiLine`` parameter to 
``true``.
 
 If the ``schema`` parameter is not specified, this function goes
 through the input once to determine the input schema.
@@ -230,7 +230,7 @@ class DataFrameReader(OptionUtils):
 formats follow the formats at 
``java.text.SimpleDateFormat``.
  

spark git commit: [SPARK-21092][SQL] Wire SQLConf in logical plan and expressions

2017-06-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 292467440 -> fffeb6d7c


[SPARK-21092][SQL] Wire SQLConf in logical plan and expressions

## What changes were proposed in this pull request?
It is really painful to not have configs in logical plan and expressions. We 
had to add all sorts of hacks (e.g. pass SQLConf explicitly in functions). This 
patch exposes SQLConf in logical plan, using a thread local variable and a 
getter closure that's set once there is an active SparkSession.

The implementation is a bit of a hack, since we didn't anticipate this need in 
the beginning (config was only exposed in physical plan). The implementation is 
described in `SQLConf.get`.

In terms of future work, we should follow up to clean up CBO (remove the need 
for passing in config).

## How was this patch tested?
Updated relevant tests for constraint propagation.

Author: Reynold Xin 

Closes #18299 from rxin/SPARK-21092.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fffeb6d7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fffeb6d7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fffeb6d7

Branch: refs/heads/master
Commit: fffeb6d7c37ee673a32584f3b2fd3afe86af793a
Parents: 2924674
Author: Reynold Xin 
Authored: Wed Jun 14 22:11:41 2017 -0700
Committer: Reynold Xin 
Committed: Wed Jun 14 22:11:41 2017 -0700

--
 .../sql/catalyst/optimizer/Optimizer.scala  | 25 ++--
 .../spark/sql/catalyst/optimizer/joins.scala|  5 +--
 .../spark/sql/catalyst/plans/QueryPlan.scala|  3 ++
 .../catalyst/plans/QueryPlanConstraints.scala   | 33 +--
 .../org/apache/spark/sql/internal/SQLConf.scala | 42 
 .../BinaryComparisonSimplificationSuite.scala   |  2 +-
 .../optimizer/BooleanSimplificationSuite.scala  |  2 +-
 .../InferFiltersFromConstraintsSuite.scala  | 24 +--
 .../optimizer/OuterJoinEliminationSuite.scala   | 37 -
 .../optimizer/PropagateEmptyRelationSuite.scala |  4 +-
 .../catalyst/optimizer/PruneFiltersSuite.scala  | 36 +++--
 .../catalyst/optimizer/SetOperationSuite.scala  |  2 +-
 .../plans/ConstraintPropagationSuite.scala  | 29 +-
 .../org/apache/spark/sql/SparkSession.scala |  5 +++
 14 files changed, 141 insertions(+), 108 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fffeb6d7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d16689a..3ab70fb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -77,12 +77,12 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: SQLConf)
   // Operator push down
   PushProjectionThroughUnion,
   ReorderJoin(conf),
-  EliminateOuterJoin(conf),
+  EliminateOuterJoin,
   PushPredicateThroughJoin,
   PushDownPredicate,
   LimitPushDown(conf),
   ColumnPruning,
-  InferFiltersFromConstraints(conf),
+  InferFiltersFromConstraints,
   // Operator combine
   CollapseRepartition,
   CollapseProject,
@@ -102,7 +102,7 @@ abstract class Optimizer(sessionCatalog: SessionCatalog, 
conf: SQLConf)
   SimplifyConditionals,
   RemoveDispensableExpressions,
   SimplifyBinaryComparison,
-  PruneFilters(conf),
+  PruneFilters,
   EliminateSorts,
   SimplifyCasts,
   SimplifyCaseConversionExpressions,
@@ -619,14 +619,15 @@ object CollapseWindow extends Rule[LogicalPlan] {
  * Note: While this optimization is applicable to all types of join, it 
primarily benefits Inner and
  * LeftSemi joins.
  */
-case class InferFiltersFromConstraints(conf: SQLConf)
-extends Rule[LogicalPlan] with PredicateHelper {
-  def apply(plan: LogicalPlan): LogicalPlan = if 
(conf.constraintPropagationEnabled) {
-inferFilters(plan)
-  } else {
-plan
-  }
+object InferFiltersFromConstraints extends Rule[LogicalPlan] with 
PredicateHelper {
 
+  def apply(plan: LogicalPlan): LogicalPlan = {
+if (SQLConf.get.constraintPropagationEnabled) {
+  inferFilters(plan)
+} else {
+  plan
+}
+  }
 
   private def inferFilters(plan: LogicalPlan): LogicalPlan = plan transform {
 case filter @ Filter(condition, child) =>
@@ -717,7 +718,7 @@ object EliminateSorts extends Rule[LogicalPlan] {
  * 2) by substituting a dummy empty relation when the filter will always 

spark git commit: [SPARK-19900][CORE] Remove driver when relaunching.

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master e254e868f -> 292467440


[SPARK-19900][CORE] Remove driver when relaunching.

This is https://github.com/apache/spark/pull/17888 .

Below are some spark ui snapshots.

Master, after worker disconnects:

https://cloud.githubusercontent.com/assets/2576762/26398687/d0ee228e-40ac-11e7-986d-d3b57b87029f.png;>

Master, after worker reconnects, notice the `running drivers` part:

https://cloud.githubusercontent.com/assets/2576762/26398697/d50735a4-40ac-11e7-80d8-6e9e1cf0b62f.png;>

This patch, after worker disconnects:
https://cloud.githubusercontent.com/assets/2576762/26398009/c015d3dc-40aa-11e7-8bb4-df11a1f66645.png;>

This patch, after worker reconnects:
![image](https://cloud.githubusercontent.com/assets/2576762/26398037/d313769c-40aa-11e7-8613-5f157d193150.png)

cc cloud-fan jiangxb1987

Author: Li Yichao 

Closes #18084 from liyichao/SPARK-19900-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/29246744
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/29246744
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/29246744

Branch: refs/heads/master
Commit: 29246744061ee96afd5f57e113ad69c354e4ba4a
Parents: e254e86
Author: Li Yichao 
Authored: Thu Jun 15 08:08:26 2017 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 15 08:08:26 2017 +0800

--
 .../org/apache/spark/deploy/master/Master.scala |  16 ++-
 .../spark/deploy/master/MasterSuite.scala   | 109 +++
 2 files changed, 122 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/29246744/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
--
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index b78ae1f..f10a412 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -799,9 +799,19 @@ private[deploy] class Master(
   }
 
   private def relaunchDriver(driver: DriverInfo) {
-driver.worker = None
-driver.state = DriverState.RELAUNCHING
-waitingDrivers += driver
+// We must setup a new driver with a new driver id here, because the 
original driver may
+// be still running. Consider this scenario: a worker is network 
partitioned with master,
+// the master then relaunches driver driverID1 with a driver id driverID2, 
then the worker
+// reconnects to master. From this point on, if driverID2 is equal to 
driverID1, then master
+// can not distinguish the statusUpdate of the original driver and the 
newly relaunched one,
+// for example, when DriverStateChanged(driverID1, KILLED) arrives at 
master, master will
+// remove driverID1, so the newly relaunched driver disappears too. See 
SPARK-19900 for details.
+removeDriver(driver.id, DriverState.RELAUNCHING, None)
+val newDriver = createDriver(driver.desc)
+persistenceEngine.addDriver(newDriver)
+drivers.add(newDriver)
+waitingDrivers += newDriver
+
 schedule()
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/29246744/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
index 4f432e4..6bb0eec 100644
--- a/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/master/MasterSuite.scala
@@ -19,8 +19,10 @@ package org.apache.spark.deploy.master
 
 import java.util.Date
 import java.util.concurrent.ConcurrentLinkedQueue
+import java.util.concurrent.atomic.AtomicInteger
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.{HashMap, HashSet}
 import scala.concurrent.duration._
 import scala.io.Source
@@ -40,6 +42,49 @@ import org.apache.spark.deploy.DeployMessages._
 import org.apache.spark.rpc.{RpcAddress, RpcEndpoint, RpcEndpointRef, RpcEnv}
 import org.apache.spark.serializer
 
+object MockWorker {
+  val counter = new AtomicInteger(1)
+}
+
+class MockWorker(master: RpcEndpointRef, conf: SparkConf = new SparkConf) 
extends RpcEndpoint {
+  val seq = MockWorker.counter.incrementAndGet()
+  val id = seq.toString
+  override val rpcEnv: RpcEnv = RpcEnv.create("worker", "localhost", seq,
+conf, new SecurityManager(conf))
+  var apps = new mutable.HashMap[String, String]()
+  val driverIdToAppId = new mutable.HashMap[String, String]()
+  def 

spark git commit: [SPARK-21091][SQL] Move constraint code into QueryPlanConstraints

2017-06-14 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 77a2fc5b5 -> e254e868f


[SPARK-21091][SQL] Move constraint code into QueryPlanConstraints

## What changes were proposed in this pull request?
This patch moves constraint related code into a separate trait 
QueryPlanConstraints, so we don't litter QueryPlan with a lot of constraint 
private functions.

## How was this patch tested?
This is a simple move refactoring and should be covered by existing tests.

Author: Reynold Xin 

Closes #18298 from rxin/SPARK-21091.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e254e868
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e254e868
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e254e868

Branch: refs/heads/master
Commit: e254e868f1e640b59d8d3e0e01a5e0c488dd6e70
Parents: 77a2fc5
Author: Reynold Xin 
Authored: Wed Jun 14 14:28:21 2017 -0700
Committer: Reynold Xin 
Committed: Wed Jun 14 14:28:21 2017 -0700

--
 .../spark/sql/catalyst/plans/QueryPlan.scala| 187 +
 .../catalyst/plans/QueryPlanConstraints.scala   | 206 +++
 2 files changed, 210 insertions(+), 183 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e254e868/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
index 5ba043e..8bc462e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/QueryPlan.scala
@@ -21,194 +21,15 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.trees.TreeNode
 import org.apache.spark.sql.types.{DataType, StructType}
 
-abstract class QueryPlan[PlanType <: QueryPlan[PlanType]] extends 
TreeNode[PlanType] {
+abstract class QueryPlan[PlanType <: QueryPlan[PlanType]]
+  extends TreeNode[PlanType]
+  with QueryPlanConstraints[PlanType] {
+
   self: PlanType =>
 
   def output: Seq[Attribute]
 
   /**
-   * Extracts the relevant constraints from a given set of constraints based 
on the attributes that
-   * appear in the [[outputSet]].
-   */
-  protected def getRelevantConstraints(constraints: Set[Expression]): 
Set[Expression] = {
-constraints
-  .union(inferAdditionalConstraints(constraints))
-  .union(constructIsNotNullConstraints(constraints))
-  .filter(constraint =>
-constraint.references.nonEmpty && 
constraint.references.subsetOf(outputSet) &&
-  constraint.deterministic)
-  }
-
-  /**
-   * Infers a set of `isNotNull` constraints from null intolerant expressions 
as well as
-   * non-nullable attributes. For e.g., if an expression is of the form (`a > 
5`), this
-   * returns a constraint of the form `isNotNull(a)`
-   */
-  private def constructIsNotNullConstraints(constraints: Set[Expression]): 
Set[Expression] = {
-// First, we propagate constraints from the null intolerant expressions.
-var isNotNullConstraints: Set[Expression] = 
constraints.flatMap(inferIsNotNullConstraints)
-
-// Second, we infer additional constraints from non-nullable attributes 
that are part of the
-// operator's output
-val nonNullableAttributes = output.filterNot(_.nullable)
-isNotNullConstraints ++= nonNullableAttributes.map(IsNotNull).toSet
-
-isNotNullConstraints -- constraints
-  }
-
-  /**
-   * Infer the Attribute-specific IsNotNull constraints from the null 
intolerant child expressions
-   * of constraints.
-   */
-  private def inferIsNotNullConstraints(constraint: Expression): 
Seq[Expression] =
-constraint match {
-  // When the root is IsNotNull, we can push IsNotNull through the child 
null intolerant
-  // expressions
-  case IsNotNull(expr) => 
scanNullIntolerantAttribute(expr).map(IsNotNull(_))
-  // Constraints always return true for all the inputs. That means, null 
will never be returned.
-  // Thus, we can infer `IsNotNull(constraint)`, and also push IsNotNull 
through the child
-  // null intolerant expressions.
-  case _ => scanNullIntolerantAttribute(constraint).map(IsNotNull(_))
-}
-
-  /**
-   * Recursively explores the expressions which are null intolerant and 
returns all attributes
-   * in these expressions.
-   */
-  private def scanNullIntolerantAttribute(expr: Expression): Seq[Attribute] = 
expr match {
-case a: Attribute => Seq(a)
-case _: NullIntolerant => 
expr.children.flatMap(scanNullIntolerantAttribute)
-case _ => Seq.empty[Attribute]
-  }
-

spark git commit: Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

2017-06-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master df766a471 -> 77a2fc5b5


Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

This reverts commit f7cf2096fdecb8edab61c8973c07c6fc877ee32d.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77a2fc5b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77a2fc5b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77a2fc5b

Branch: refs/heads/master
Commit: 77a2fc5b521788b406bb32bcc3c637c1d7406e58
Parents: df766a4
Author: Xiao Li 
Authored: Wed Jun 14 11:48:32 2017 -0700
Committer: Xiao Li 
Committed: Wed Jun 14 11:48:32 2017 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 -
 .../sql/execution/basicPhysicalOperators.scala  |  3 --
 .../apache/spark/sql/execution/subquery.scala   |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala| 35 
 4 files changed, 1 insertion(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77a2fc5b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 3ea8089..9f7c760 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -552,12 +552,6 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
-  val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
-.internal()
-.doc("When true, the planner will try to find out duplicated subqueries 
and re-use them.")
-.booleanConf
-.createWithDefault(true)
-
   val STATE_STORE_PROVIDER_CLASS =
 buildConf("spark.sql.streaming.stateStore.providerClass")
   .internal()
@@ -938,8 +932,6 @@ class SQLConf extends Serializable with Logging {
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
-  def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
-
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   def constraintPropagationEnabled: Boolean = 
getConf(CONSTRAINT_PROPAGATION_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/77a2fc5b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 04c1303..bd7a5c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -599,9 +599,6 @@ case class OutputFakerExec(output: Seq[Attribute], child: 
SparkPlan) extends Spa
  */
 case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
-  // Ignore this wrapper for canonicalizing.
-  override lazy val canonicalized: SparkPlan = child.canonicalized
-
   override lazy val metrics = Map(
 "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
 "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect 
(ms)"))

http://git-wip-us.apache.org/repos/asf/spark/blob/77a2fc5b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 2abeadf..d11045f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) 
extends Rule[SparkPlan] {
 case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
-if (!conf.subqueryReuseEnabled) {
+if (!conf.exchangeReuseEnabled) {
   return plan
 }
 // Build a hash map using schema of subqueries to avoid O(N*N) sameResult 
calls.

http://git-wip-us.apache.org/repos/asf/spark/blob/77a2fc5b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index a7efcaf..68f61cf 100644
--- 

spark git commit: Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

2017-06-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 3dda682c4 -> e02e0637f


Revert "[SPARK-20941][SQL] Fix SubqueryExec Reuse"

This reverts commit 6a4e023b250a86887475958093f1d3bdcbb49a03.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e02e0637
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e02e0637
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e02e0637

Branch: refs/heads/branch-2.2
Commit: e02e0637f43dddbdc0961cb2af85869f7ef9e12d
Parents: 3dda682
Author: Xiao Li 
Authored: Wed Jun 14 11:35:14 2017 -0700
Committer: Xiao Li 
Committed: Wed Jun 14 11:35:14 2017 -0700

--
 .../org/apache/spark/sql/internal/SQLConf.scala |  8 -
 .../sql/execution/basicPhysicalOperators.scala  |  3 --
 .../apache/spark/sql/execution/subquery.scala   |  2 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala| 35 
 4 files changed, 1 insertion(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 1ea9eb5..94244dd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -552,12 +552,6 @@ object SQLConf {
 .booleanConf
 .createWithDefault(true)
 
-  val SUBQUERY_REUSE_ENABLED = buildConf("spark.sql.subquery.reuse")
-.internal()
-.doc("When true, the planner will try to find out duplicated subqueries 
and re-use them.")
-.booleanConf
-.createWithDefault(true)
-
   val STATE_STORE_MIN_DELTAS_FOR_SNAPSHOT =
 buildConf("spark.sql.streaming.stateStore.minDeltasForSnapshot")
   .internal()
@@ -927,8 +921,6 @@ class SQLConf extends Serializable with Logging {
 
   def exchangeReuseEnabled: Boolean = getConf(EXCHANGE_REUSE_ENABLED)
 
-  def subqueryReuseEnabled: Boolean = getConf(SUBQUERY_REUSE_ENABLED)
-
   def caseSensitiveAnalysis: Boolean = getConf(SQLConf.CASE_SENSITIVE)
 
   def constraintPropagationEnabled: Boolean = 
getConf(CONSTRAINT_PROPAGATION_ENABLED)

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
index 04c1303..bd7a5c5 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
@@ -599,9 +599,6 @@ case class OutputFakerExec(output: Seq[Attribute], child: 
SparkPlan) extends Spa
  */
 case class SubqueryExec(name: String, child: SparkPlan) extends UnaryExecNode {
 
-  // Ignore this wrapper for canonicalizing.
-  override lazy val canonicalized: SparkPlan = child.canonicalized
-
   override lazy val metrics = Map(
 "dataSize" -> SQLMetrics.createMetric(sparkContext, "data size (bytes)"),
 "collectTime" -> SQLMetrics.createMetric(sparkContext, "time to collect 
(ms)"))

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
index 2abeadf..d11045f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/subquery.scala
@@ -156,7 +156,7 @@ case class PlanSubqueries(sparkSession: SparkSession) 
extends Rule[SparkPlan] {
 case class ReuseSubquery(conf: SQLConf) extends Rule[SparkPlan] {
 
   def apply(plan: SparkPlan): SparkPlan = {
-if (!conf.subqueryReuseEnabled) {
+if (!conf.exchangeReuseEnabled) {
   return plan
 }
 // Build a hash map using schema of subqueries to avoid O(N*N) sameResult 
calls.

http://git-wip-us.apache.org/repos/asf/spark/blob/e02e0637/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 2e831dc..cd14d24 100644
--- 

spark git commit: [SPARK-21089][SQL] Fix DESC EXTENDED/FORMATTED to Show Table Properties

2017-06-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 626511953 -> 3dda682c4


[SPARK-21089][SQL] Fix DESC EXTENDED/FORMATTED to Show Table Properties

Since both table properties and storage properties share the same key values, 
table properties are not shown in the output of DESC EXTENDED/FORMATTED when 
the storage properties are not empty.

This PR is to fix the above issue by renaming them to different keys.

Added test cases.

Author: Xiao Li 

Closes #18294 from gatorsmile/tableProperties.

(cherry picked from commit df766a471426625fe86c8845f6261e0fc087772d)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dda682c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dda682c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dda682c

Branch: refs/heads/branch-2.2
Commit: 3dda682c43daa5eb016d1c02a23cc40a443a6523
Parents: 6265119
Author: Xiao Li 
Authored: Wed Jun 14 11:13:16 2017 -0700
Committer: Xiao Li 
Committed: Wed Jun 14 11:19:28 2017 -0700

--
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../resources/sql-tests/inputs/describe.sql |   3 +
 .../sql-tests/results/describe.sql.out  | 183 ++-
 3 files changed, 104 insertions(+), 86 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3dda682c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index cc0cbba..976d787 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -75,7 +75,7 @@ case class CatalogStorageFormat(
 CatalogUtils.maskCredentials(properties) match {
   case props if props.isEmpty => // No-op
   case props =>
-map.put("Properties", props.map(p => p._1 + "=" + p._2).mkString("[", 
", ", "]"))
+map.put("Storage Properties", props.map(p => p._1 + "=" + 
p._2).mkString("[", ", ", "]"))
 }
 map
   }
@@ -313,7 +313,7 @@ case class CatalogTable(
   }
 }
 
-if (properties.nonEmpty) map.put("Properties", tableProperties)
+if (properties.nonEmpty) map.put("Table Properties", tableProperties)
 stats.foreach(s => map.put("Statistics", s.simpleString))
 map ++= storage.toLinkedHashMap
 if (tracksPartitionsInCatalog) map.put("Partition Provider", "Catalog")

http://git-wip-us.apache.org/repos/asf/spark/blob/3dda682c/sql/core/src/test/resources/sql-tests/inputs/describe.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe.sql 
b/sql/core/src/test/resources/sql-tests/inputs/describe.sql
index 6de4cf0..91b9668 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/describe.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe.sql
@@ -1,4 +1,5 @@
 CREATE TABLE t (a STRING, b INT, c STRING, d STRING) USING parquet
+  OPTIONS (a '1', b '2')
   PARTITIONED BY (c, d) CLUSTERED BY (a) SORTED BY (b ASC) INTO 2 BUCKETS
   COMMENT 'table_comment';
 
@@ -13,6 +14,8 @@ CREATE TEMPORARY VIEW temp_Data_Source_View
 
 CREATE VIEW v AS SELECT * FROM t;
 
+ALTER TABLE t SET TBLPROPERTIES (e = '3');
+
 ALTER TABLE t ADD PARTITION (c='Us', d=1);
 
 DESCRIBE t;

http://git-wip-us.apache.org/repos/asf/spark/blob/3dda682c/sql/core/src/test/resources/sql-tests/results/describe.sql.out
--
diff --git a/sql/core/src/test/resources/sql-tests/results/describe.sql.out 
b/sql/core/src/test/resources/sql-tests/results/describe.sql.out
index 46d32bb..329532c 100644
--- a/sql/core/src/test/resources/sql-tests/results/describe.sql.out
+++ b/sql/core/src/test/resources/sql-tests/results/describe.sql.out
@@ -1,9 +1,10 @@
 -- Automatically generated by SQLQueryTestSuite
--- Number of queries: 31
+-- Number of queries: 32
 
 
 -- !query 0
 CREATE TABLE t (a STRING, b INT, c STRING, d STRING) USING parquet
+  OPTIONS (a '1', b '2')
   PARTITIONED BY (c, d) CLUSTERED BY (a) SORTED BY (b ASC) INTO 2 BUCKETS
   COMMENT 'table_comment'
 -- !query 0 schema
@@ -42,7 +43,7 @@ struct<>
 
 
 -- !query 4
-ALTER TABLE t ADD PARTITION (c='Us', d=1)
+ALTER TABLE t SET TBLPROPERTIES (e = '3')
 -- !query 4 schema
 struct<>
 -- !query 4 output
@@ -50,10 +51,18 @@ struct<>
 
 
 -- !query 5
-DESCRIBE t
+ALTER TABLE t ADD PARTITION (c='Us', d=1)
 -- !query 5 schema

spark git commit: [SPARK-21089][SQL] Fix DESC EXTENDED/FORMATTED to Show Table Properties

2017-06-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 0c88e8d37 -> df766a471


[SPARK-21089][SQL] Fix DESC EXTENDED/FORMATTED to Show Table Properties

### What changes were proposed in this pull request?

Since both table properties and storage properties share the same key values, 
table properties are not shown in the output of DESC EXTENDED/FORMATTED when 
the storage properties are not empty.

This PR is to fix the above issue by renaming them to different keys.

### How was this patch tested?
Added test cases.

Author: Xiao Li 

Closes #18294 from gatorsmile/tableProperties.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/df766a47
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/df766a47
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/df766a47

Branch: refs/heads/master
Commit: df766a471426625fe86c8845f6261e0fc087772d
Parents: 0c88e8d
Author: Xiao Li 
Authored: Wed Jun 14 11:13:16 2017 -0700
Committer: Xiao Li 
Committed: Wed Jun 14 11:13:16 2017 -0700

--
 .../spark/sql/catalyst/catalog/interface.scala  |   4 +-
 .../resources/sql-tests/inputs/describe.sql |   3 +
 .../describe-table-after-alter-table.sql.out|   2 +-
 .../sql-tests/results/describe.sql.out  | 183 ++-
 4 files changed, 105 insertions(+), 87 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/df766a47/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 2f328cc..c043ed9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -75,7 +75,7 @@ case class CatalogStorageFormat(
 CatalogUtils.maskCredentials(properties) match {
   case props if props.isEmpty => // No-op
   case props =>
-map.put("Properties", props.map(p => p._1 + "=" + p._2).mkString("[", 
", ", "]"))
+map.put("Storage Properties", props.map(p => p._1 + "=" + 
p._2).mkString("[", ", ", "]"))
 }
 map
   }
@@ -316,7 +316,7 @@ case class CatalogTable(
   }
 }
 
-if (properties.nonEmpty) map.put("Properties", tableProperties)
+if (properties.nonEmpty) map.put("Table Properties", tableProperties)
 stats.foreach(s => map.put("Statistics", s.simpleString))
 map ++= storage.toLinkedHashMap
 if (tracksPartitionsInCatalog) map.put("Partition Provider", "Catalog")

http://git-wip-us.apache.org/repos/asf/spark/blob/df766a47/sql/core/src/test/resources/sql-tests/inputs/describe.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe.sql 
b/sql/core/src/test/resources/sql-tests/inputs/describe.sql
index 6de4cf0..91b9668 100644
--- a/sql/core/src/test/resources/sql-tests/inputs/describe.sql
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe.sql
@@ -1,4 +1,5 @@
 CREATE TABLE t (a STRING, b INT, c STRING, d STRING) USING parquet
+  OPTIONS (a '1', b '2')
   PARTITIONED BY (c, d) CLUSTERED BY (a) SORTED BY (b ASC) INTO 2 BUCKETS
   COMMENT 'table_comment';
 
@@ -13,6 +14,8 @@ CREATE TEMPORARY VIEW temp_Data_Source_View
 
 CREATE VIEW v AS SELECT * FROM t;
 
+ALTER TABLE t SET TBLPROPERTIES (e = '3');
+
 ALTER TABLE t ADD PARTITION (c='Us', d=1);
 
 DESCRIBE t;

http://git-wip-us.apache.org/repos/asf/spark/blob/df766a47/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out
--
diff --git 
a/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out
 
b/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out
index eece00d..4bf4633 100644
--- 
a/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out
+++ 
b/sql/core/src/test/resources/sql-tests/results/describe-table-after-alter-table.sql.out
@@ -57,7 +57,7 @@ Last Access [not included in comparison]
 Type   MANAGED 
 Provider   parquet 
 Commentmodified comment
-Properties [type=parquet]  
+Table Properties   [type=parquet]  
 Location [not included in 
comparison]sql/core/spark-warehouse/table_with_comment
 
 


spark git commit: [SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values when the Input is BigDecimal between -1.0 and 1.0

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 0f3598820 -> 0239b1673


[SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values 
when the Input is BigDecimal between -1.0 and 1.0

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/18244 to 2.2

---

The precision and scale of decimal values are wrong when the input is 
BigDecimal between -1.0 and 1.0.

The BigDecimal's precision is the digit count starts from the leftmost nonzero 
digit based on the [JAVA's BigDecimal 
definition](https://docs.oracle.com/javase/7/docs/api/java/math/BigDecimal.html).
 However, our Decimal decision follows the database decimal standard, which is 
the total number of digits, including both to the left and the right of the 
decimal point. Thus, this PR is to fix the issue by doing the conversion.

Before this PR, the following queries failed:
```SQL
select 1 > 0.0001
select floor(0.0001)
select ceil(0.0001)
```

### How was this patch tested?
Added test cases.

Author: gatorsmile 

Closes #18297 from gatorsmile/backport18244.

(cherry picked from commit 626511953b87747e933e4f64b9fcd4c4776a5c4e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0239b167
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0239b167
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0239b167

Branch: refs/heads/branch-2.0
Commit: 0239b167359ce94deb13eaa447a692f2ea590f52
Parents: 0f35988
Author: gatorsmile 
Authored: Wed Jun 14 19:18:28 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 19:18:58 2017 +0800

--
 .../org/apache/spark/sql/types/Decimal.scala|  10 +-
 .../apache/spark/sql/types/DecimalSuite.scala   |  10 ++
 .../resources/sql-tests/inputs/arithmetic.sql   |  24 
 .../sql-tests/results/arithmetic.sql.out| 134 ++-
 4 files changed, 176 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0239b167/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 7085905..e262bd5 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -124,7 +124,15 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   def set(decimal: BigDecimal): Decimal = {
 this.decimalVal = decimal
 this.longVal = 0L
-this._precision = decimal.precision
+if (decimal.precision <= decimal.scale) {
+  // For Decimal, we expect the precision is equal to or large than the 
scale, however,
+  // in BigDecimal, the digit count starts from the leftmost nonzero digit 
of the exact
+  // result. For example, the precision of 0.01 equals to 1 based on the 
definition, but
+  // the scale is 2. The expected precision should be 3.
+  this._precision = decimal.scale + 1
+} else {
+  this._precision = decimal.precision
+}
 this._scale = decimal.scale
 this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/0239b167/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 4cf329d..9b401cf 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -34,6 +34,16 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 
   test("creating decimals") {
 checkDecimal(new Decimal(), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("0.09")), "0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.9")), "0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("0.90")), "0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.0")), "0.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("0")), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("1.0")), "1.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.09")), "-0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("-0.9")), "-0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.90")), "-0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("-1.0")), "-1.0", 2, 1)
 checkDecimal(Decimal(BigDecimal("10.030")), "10.030", 5, 3)
 checkDecimal(Decimal(BigDecimal("10.030"), 4, 1), 

spark git commit: [SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values when the Input is BigDecimal between -1.0 and 1.0

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ee0e74e65 -> a890466bc


[SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values 
when the Input is BigDecimal between -1.0 and 1.0

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/18244 to 2.2

---

The precision and scale of decimal values are wrong when the input is 
BigDecimal between -1.0 and 1.0.

The BigDecimal's precision is the digit count starts from the leftmost nonzero 
digit based on the [JAVA's BigDecimal 
definition](https://docs.oracle.com/javase/7/docs/api/java/math/BigDecimal.html).
 However, our Decimal decision follows the database decimal standard, which is 
the total number of digits, including both to the left and the right of the 
decimal point. Thus, this PR is to fix the issue by doing the conversion.

Before this PR, the following queries failed:
```SQL
select 1 > 0.0001
select floor(0.0001)
select ceil(0.0001)
```

### How was this patch tested?
Added test cases.

Author: gatorsmile 

Closes #18297 from gatorsmile/backport18244.

(cherry picked from commit 626511953b87747e933e4f64b9fcd4c4776a5c4e)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a890466b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a890466b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a890466b

Branch: refs/heads/branch-2.1
Commit: a890466bcc600941927c7040aee409d82b7587d6
Parents: ee0e74e
Author: gatorsmile 
Authored: Wed Jun 14 19:18:28 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 19:18:43 2017 +0800

--
 .../org/apache/spark/sql/types/Decimal.scala|  10 +-
 .../apache/spark/sql/types/DecimalSuite.scala   |  10 ++
 .../resources/sql-tests/inputs/arithmetic.sql   |  24 
 .../sql-tests/results/arithmetic.sql.out| 134 ++-
 4 files changed, 176 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a890466b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 465fb83..1807fd6 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -125,7 +125,15 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   def set(decimal: BigDecimal): Decimal = {
 this.decimalVal = decimal
 this.longVal = 0L
-this._precision = decimal.precision
+if (decimal.precision <= decimal.scale) {
+  // For Decimal, we expect the precision is equal to or large than the 
scale, however,
+  // in BigDecimal, the digit count starts from the leftmost nonzero digit 
of the exact
+  // result. For example, the precision of 0.01 equals to 1 based on the 
definition, but
+  // the scale is 2. The expected precision should be 3.
+  this._precision = decimal.scale + 1
+} else {
+  this._precision = decimal.precision
+}
 this._scale = decimal.scale
 this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/a890466b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 52d0692..6a71aca 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -32,6 +32,16 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 
   test("creating decimals") {
 checkDecimal(new Decimal(), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("0.09")), "0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.9")), "0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("0.90")), "0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.0")), "0.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("0")), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("1.0")), "1.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.09")), "-0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("-0.9")), "-0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.90")), "-0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("-1.0")), "-1.0", 2, 1)
 checkDecimal(Decimal(BigDecimal("10.030")), "10.030", 5, 3)
 checkDecimal(Decimal(BigDecimal("10.030"), 4, 1), 

spark git commit: [SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values when the Input is BigDecimal between -1.0 and 1.0

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 9bdc83590 -> 626511953


[SPARK-20211][SQL][BACKPORT-2.2] Fix the Precision and Scale of Decimal Values 
when the Input is BigDecimal between -1.0 and 1.0

### What changes were proposed in this pull request?

This PR is to backport https://github.com/apache/spark/pull/18244 to 2.2

---

The precision and scale of decimal values are wrong when the input is 
BigDecimal between -1.0 and 1.0.

The BigDecimal's precision is the digit count starts from the leftmost nonzero 
digit based on the [JAVA's BigDecimal 
definition](https://docs.oracle.com/javase/7/docs/api/java/math/BigDecimal.html).
 However, our Decimal decision follows the database decimal standard, which is 
the total number of digits, including both to the left and the right of the 
decimal point. Thus, this PR is to fix the issue by doing the conversion.

Before this PR, the following queries failed:
```SQL
select 1 > 0.0001
select floor(0.0001)
select ceil(0.0001)
```

### How was this patch tested?
Added test cases.

Author: gatorsmile 

Closes #18297 from gatorsmile/backport18244.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/62651195
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/62651195
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/62651195

Branch: refs/heads/branch-2.2
Commit: 626511953b87747e933e4f64b9fcd4c4776a5c4e
Parents: 9bdc835
Author: gatorsmile 
Authored: Wed Jun 14 19:18:28 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 19:18:28 2017 +0800

--
 .../org/apache/spark/sql/types/Decimal.scala|  10 +-
 .../apache/spark/sql/types/DecimalSuite.scala   |  10 ++
 .../resources/sql-tests/inputs/arithmetic.sql   |  24 
 .../sql-tests/results/arithmetic.sql.out| 134 ++-
 4 files changed, 176 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/62651195/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
index 80916ee..1f1fb51 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/types/Decimal.scala
@@ -126,7 +126,15 @@ final class Decimal extends Ordered[Decimal] with 
Serializable {
   def set(decimal: BigDecimal): Decimal = {
 this.decimalVal = decimal
 this.longVal = 0L
-this._precision = decimal.precision
+if (decimal.precision <= decimal.scale) {
+  // For Decimal, we expect the precision is equal to or large than the 
scale, however,
+  // in BigDecimal, the digit count starts from the leftmost nonzero digit 
of the exact
+  // result. For example, the precision of 0.01 equals to 1 based on the 
definition, but
+  // the scale is 2. The expected precision should be 3.
+  this._precision = decimal.scale + 1
+} else {
+  this._precision = decimal.precision
+}
 this._scale = decimal.scale
 this
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/62651195/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
index 93c231e..144f3d6 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/types/DecimalSuite.scala
@@ -32,6 +32,16 @@ class DecimalSuite extends SparkFunSuite with 
PrivateMethodTester {
 
   test("creating decimals") {
 checkDecimal(new Decimal(), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("0.09")), "0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.9")), "0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("0.90")), "0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("0.0")), "0.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("0")), "0", 1, 0)
+checkDecimal(Decimal(BigDecimal("1.0")), "1.0", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.09")), "-0.09", 3, 2)
+checkDecimal(Decimal(BigDecimal("-0.9")), "-0.9", 2, 1)
+checkDecimal(Decimal(BigDecimal("-0.90")), "-0.90", 3, 2)
+checkDecimal(Decimal(BigDecimal("-1.0")), "-1.0", 2, 1)
 checkDecimal(Decimal(BigDecimal("10.030")), "10.030", 5, 3)
 checkDecimal(Decimal(BigDecimal("10.030"), 4, 1), "10.0", 4, 1)
 checkDecimal(Decimal(BigDecimal("-9.95"), 4, 1), "-10.0", 4, 1)


spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 42cc83082 -> 9bdc83590


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile 

Closes #18295 from gatorsmile/reorderReadSchema.

(cherry picked from commit 0c88e8d37224713199ca5661c2cd57f5918dcb9a)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9bdc8359
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9bdc8359
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9bdc8359

Branch: refs/heads/branch-2.2
Commit: 9bdc83590922e0e1f22424904411acb0d1b37a11
Parents: 42cc830
Author: gatorsmile 
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 16:28:21 2017 +0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 31 
 .../sql/hive/HiveExternalCatalogSuite.scala | 26 
 2 files changed, 52 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9bdc8359/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index ba48fac..a03beb7 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -717,6 +717,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+val partitionFields = partColumnNames.map { partCol =>
+  schema.find(_.name == partCol).getOrElse {
+throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+  s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+  s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+  }
+}
+StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
 val hiveTable = table.copy(
   provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -726,10 +740,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 // schema from table properties.
 if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
   val schemaFromTableProps = getSchemaFromTableProperties(table)
-  if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+  val partColumnNames = getPartitionColumnsFromTableProperties(table)
+  val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+  if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
 hiveTable.copy(
-  schema = schemaFromTableProps,
-  partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+  schema = reorderedSchema,
+  partitionColumnNames = partColumnNames,
   bucketSpec = getBucketSpecFromTableProperties(table))
   } else {
 // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -759,11 +776,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 }
 val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+val schemaFromTableProps = getSchemaFromTableProperties(table)
+val partColumnNames = getPartitionColumnsFromTableProperties(table)
+val reorderedSchema = 

spark git commit: [SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

2017-06-14 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d6f76eb34 -> 0c88e8d37


[SPARK-21085][SQL] Failed to read the partitioned table created by Spark 2.1

### What changes were proposed in this pull request?
Before the PR, Spark is unable to read the partitioned table created by Spark 
2.1 when the table schema does not put the partitioning column at the end of 
the schema.
[assert(partitionFields.map(_.name) == 
partitionColumnNames)](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala#L234-L236)

When reading the table metadata from the metastore, we also need to reorder the 
columns.

### How was this patch tested?
Added test cases to check both Hive-serde and data source tables.

Author: gatorsmile 

Closes #18295 from gatorsmile/reorderReadSchema.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c88e8d3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c88e8d3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c88e8d3

Branch: refs/heads/master
Commit: 0c88e8d37224713199ca5661c2cd57f5918dcb9a
Parents: d6f76eb
Author: gatorsmile 
Authored: Wed Jun 14 16:28:06 2017 +0800
Committer: Wenchen Fan 
Committed: Wed Jun 14 16:28:06 2017 +0800

--
 .../spark/sql/hive/HiveExternalCatalog.scala| 31 
 .../sql/hive/HiveExternalCatalogSuite.scala | 26 
 2 files changed, 52 insertions(+), 5 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c88e8d3/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 7fcf06d..1945367 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -729,6 +729,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
   properties = table.properties.filterNot { case (key, _) => 
key.startsWith(SPARK_SQL_PREFIX) })
   }
 
+  // Reorder table schema to put partition columns at the end. Before Spark 
2.2, the partition
+  // columns are not put at the end of schema. We need to reorder it when 
reading the schema
+  // from the table properties.
+  private def reorderSchema(schema: StructType, partColumnNames: Seq[String]): 
StructType = {
+val partitionFields = partColumnNames.map { partCol =>
+  schema.find(_.name == partCol).getOrElse {
+throw new AnalysisException("The metadata is corrupted. Unable to find 
the " +
+  s"partition column names from the schema. schema: 
${schema.catalogString}. " +
+  s"Partition columns: ${partColumnNames.mkString("[", ", ", "]")}")
+  }
+}
+StructType(schema.filterNot(partitionFields.contains) ++ partitionFields)
+  }
+
   private def restoreHiveSerdeTable(table: CatalogTable): CatalogTable = {
 val hiveTable = table.copy(
   provider = Some(DDLUtils.HIVE_PROVIDER),
@@ -738,10 +752,13 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 // schema from table properties.
 if (table.properties.contains(DATASOURCE_SCHEMA_NUMPARTS)) {
   val schemaFromTableProps = getSchemaFromTableProperties(table)
-  if (DataType.equalsIgnoreCaseAndNullability(schemaFromTableProps, 
table.schema)) {
+  val partColumnNames = getPartitionColumnsFromTableProperties(table)
+  val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
+  if (DataType.equalsIgnoreCaseAndNullability(reorderedSchema, 
table.schema)) {
 hiveTable.copy(
-  schema = schemaFromTableProps,
-  partitionColumnNames = getPartitionColumnsFromTableProperties(table),
+  schema = reorderedSchema,
+  partitionColumnNames = partColumnNames,
   bucketSpec = getBucketSpecFromTableProperties(table))
   } else {
 // Hive metastore may change the table schema, e.g. schema inference. 
If the table
@@ -771,11 +788,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
 }
 val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
+val schemaFromTableProps = getSchemaFromTableProperties(table)
+val partColumnNames = getPartitionColumnsFromTableProperties(table)
+val reorderedSchema = reorderSchema(schema = schemaFromTableProps, 
partColumnNames)
+
 table.copy(
   provider = Some(provider),
   storage = 

spark git commit: [SPARK-21057][ML] Do not use a PascalDistribution in countApprox

2017-06-14 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 4d01aa464 -> d6f76eb34


[SPARK-21057][ML] Do not use a PascalDistribution in countApprox

## What changes were proposed in this pull request?

Use Poisson analysis for approx count in all cases.

## How was this patch tested?

Existing tests.

Author: Sean Owen 

Closes #18276 from srowen/SPARK-21057.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d6f76eb3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d6f76eb3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d6f76eb3

Branch: refs/heads/master
Commit: d6f76eb346b691a553e89d3283f2a235661ae78c
Parents: 4d01aa4
Author: Sean Owen 
Authored: Wed Jun 14 09:01:20 2017 +0100
Committer: Sean Owen 
Committed: Wed Jun 14 09:01:20 2017 +0100

--
 .../apache/spark/partial/CountEvaluator.scala   | 23 +---
 .../spark/partial/CountEvaluatorSuite.scala | 12 +-
 2 files changed, 12 insertions(+), 23 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d6f76eb3/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
--
diff --git a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala 
b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
index 5a5bd7f..cbee136 100644
--- a/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
+++ b/core/src/main/scala/org/apache/spark/partial/CountEvaluator.scala
@@ -17,7 +17,7 @@
 
 package org.apache.spark.partial
 
-import org.apache.commons.math3.distribution.{PascalDistribution, 
PoissonDistribution}
+import org.apache.commons.math3.distribution.PoissonDistribution
 
 /**
  * An ApproximateEvaluator for counts.
@@ -48,22 +48,11 @@ private[spark] class CountEvaluator(totalOutputs: Int, 
confidence: Double)
 private[partial] object CountEvaluator {
 
   def bound(confidence: Double, sum: Long, p: Double): BoundedDouble = {
-// Let the total count be N. A fraction p has been counted already, with 
sum 'sum',
-// as if each element from the total data set had been seen with 
probability p.
-val dist =
-  if (sum <= 1) {
-// The remaining count, k=N-sum, may be modeled as negative binomial 
(aka Pascal),
-// where there have been 'sum' successes of probability p already. 
(There are several
-// conventions, but this is the one followed by Commons Math3.)
-new PascalDistribution(sum.toInt, p)
-  } else {
-// For large 'sum' (certainly, > Int.MaxValue!), use a Poisson 
approximation, which has
-// a different interpretation. "sum" elements have been observed 
having scanned a fraction
-// p of the data. This suggests data is counted at a rate of sum / p 
across the whole data
-// set. The total expected count from the rest is distributed as
-// (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p)
-new PoissonDistribution(sum * (1 - p) / p)
-  }
+// "sum" elements have been observed having scanned a fraction
+// p of the data. This suggests data is counted at a rate of sum / p 
across the whole data
+// set. The total expected count from the rest is distributed as
+// (1-p) Poisson(sum / p) = Poisson(sum*(1-p)/p)
+val dist = new PoissonDistribution(sum * (1 - p) / p)
 // Not quite symmetric; calculate interval straight from discrete 
distribution
 val low = dist.inverseCumulativeProbability((1 - confidence) / 2)
 val high = dist.inverseCumulativeProbability((1 + confidence) / 2)

http://git-wip-us.apache.org/repos/asf/spark/blob/d6f76eb3/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala
--
diff --git 
a/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala 
b/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala
index da3256b..3c1208c 100644
--- a/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala
+++ b/core/src/test/scala/org/apache/spark/partial/CountEvaluatorSuite.scala
@@ -23,21 +23,21 @@ class CountEvaluatorSuite extends SparkFunSuite {
 
   test("test count 0") {
 val evaluator = new CountEvaluator(10, 0.95)
-assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == 
evaluator.currentResult())
+assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, 
Double.PositiveInfinity))
 evaluator.merge(1, 0)
-assert(new BoundedDouble(0.0, 0.0, 0.0, Double.PositiveInfinity) == 
evaluator.currentResult())
+assert(evaluator.currentResult() === new BoundedDouble(0.0, 0.0, 0.0, 
Double.PositiveInfinity))
   }
 
   test("test count >= 1") {
 

spark git commit: [SPARK-20754][SQL][FOLLOWUP] Add Function Alias For MOD/POSITION.

2017-06-14 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master dccc0aa3c -> 4d01aa464


[SPARK-20754][SQL][FOLLOWUP] Add Function Alias For MOD/POSITION.

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18106 Support TRUNC (number),  We should 
also add function alias for `MOD `and `POSITION`.

`POSITION(substr IN str) `is a synonym for `LOCATE(substr,str)`. same as MySQL: 
https://dev.mysql.com/doc/refman/5.7/en/string-functions.html#function_position

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #18206 from wangyum/SPARK-20754-mod


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d01aa46
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d01aa46
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d01aa46

Branch: refs/heads/master
Commit: 4d01aa46489bb2e5f7c143d5e58981b3eb2f4a7d
Parents: dccc0aa
Author: Yuming Wang 
Authored: Tue Jun 13 23:39:06 2017 -0700
Committer: Xiao Li 
Committed: Tue Jun 13 23:39:06 2017 -0700

--
 .../org/apache/spark/sql/catalyst/parser/SqlBase.g4   |  3 +++
 .../spark/sql/catalyst/analysis/FunctionRegistry.scala|  2 ++
 .../spark/sql/catalyst/expressions/arithmetic.scala   |  2 ++
 .../sql/catalyst/expressions/stringExpressions.scala  |  4 
 .../org/apache/spark/sql/catalyst/parser/AstBuilder.scala |  7 +++
 .../sql/catalyst/parser/TableIdentifierParserSuite.scala  |  2 +-
 .../src/test/resources/sql-tests/inputs/operators.sql |  5 -
 .../test/resources/sql-tests/inputs/string-functions.sql  |  3 +++
 .../test/resources/sql-tests/results/operators.sql.out| 10 +-
 .../resources/sql-tests/results/string-functions.sql.out  | 10 +-
 10 files changed, 44 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d01aa46/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 43f7ff5..ef5648c 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -563,6 +563,7 @@ primaryExpression
 | CAST '(' expression AS dataType ')'  
#cast
 | FIRST '(' expression (IGNORE NULLS)? ')' 
#first
 | LAST '(' expression (IGNORE NULLS)? ')'  
#last
+| POSITION '(' substr=valueExpression IN str=valueExpression ')'   
#position
 | constant 
#constantDefault
 | ASTERISK 
#star
 | qualifiedName '.' ASTERISK   
#star
@@ -720,6 +721,7 @@ nonReserved
 | SET | RESET
 | VIEW | REPLACE
 | IF
+| POSITION
 | NO | DATA
 | START | TRANSACTION | COMMIT | ROLLBACK | IGNORE
 | SORT | CLUSTER | DISTRIBUTE | UNSET | TBLPROPERTIES | SKEWED | STORED | 
DIRECTORIES | LOCATION
@@ -850,6 +852,7 @@ MACRO: 'MACRO';
 IGNORE: 'IGNORE';
 
 IF: 'IF';
+POSITION: 'POSITION';
 
 EQ  : '=' | '==';
 NSEQ: '<=>';

http://git-wip-us.apache.org/repos/asf/spark/blob/4d01aa46/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 4245b70..8773281 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -240,6 +240,7 @@ object FunctionRegistry {
 expression[Log1p]("log1p"),
 expression[Log2]("log2"),
 expression[Log]("ln"),
+expression[Remainder]("mod"),
 expression[UnaryMinus]("negative"),
 expression[Pi]("pi"),
 expression[Pmod]("pmod"),
@@ -325,6 +326,7 @@ object FunctionRegistry {
 expression[StringTrimLeft]("ltrim"),
 expression[JsonTuple]("json_tuple"),
 expression[ParseUrl]("parse_url"),
+expression[StringLocate]("position"),
 expression[FormatString]("printf"),