spark git commit: [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 029e40b41 -> eeb1d6db8


[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.

## What changes were proposed in this pull request?

A follow up to SPARK-19859:

- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark

## How was this patch tested?

Jenkins.

Author: uncleGen 

Closes #17221 from uncleGen/SPARK-19859.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eeb1d6db
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eeb1d6db
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eeb1d6db

Branch: refs/heads/master
Commit: eeb1d6db878641d9eac62d0869a90fe80c1f4461
Parents: 029e40b
Author: uncleGen 
Authored: Wed Mar 8 23:23:10 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 23:23:10 2017 -0800

--
 .../plans/logical/EventTimeWatermark.scala   |  9 -
 .../streaming/EventTimeWatermarkExec.scala   | 19 +++
 2 files changed, 19 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eeb1d6db/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index 62f68a6..06196b5 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
 object EventTimeWatermark {
   /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the 
eventTime watermark delay. */
   val delayKey = "spark.watermarkDelayMs"
+
+  def getDelayMs(delay: CalendarInterval): Long = {
+// We define month as `31 days` to simplify calculation.
+val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+delay.milliseconds + delay.months * millisPerMonth
+  }
 }
 
 /**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
   // Update the metadata on the eventTime column to include the desired delay.
   override val output: Seq[Attribute] = child.output.map { a =>
 if (a semanticEquals eventTime) {
+  val delayMs = EventTimeWatermark.getDelayMs(delay)
   val updatedMetadata = new MetadataBuilder()
 .withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+.putLong(EventTimeWatermark.delayKey, delayMs)
 .build()
   a.withMetadata(updatedMetadata)
 } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/eeb1d6db/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 5a9a99e..25cf609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,10 +84,7 @@ case class EventTimeWatermarkExec(
 child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
-  val delayMs = {
-val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
-delay.milliseconds + delay.months * millisPerMonth
-  }
+  val delayMs = EventTimeWatermark.getDelayMs(delay)
 
   sparkContext.register(eventTimeStats)
 
@@ -105,10 +102,16 @@ case class EventTimeWatermarkExec(
   override val output: Seq[Attribute] = child.output.map { a =>
 if (a semanticEquals eventTime) {
   val updatedMetadata = new MetadataBuilder()
-  .withMetadata(a.metadata)
-  .putLong(EventTimeWatermark.delayKey, delayMs)
-  .build()
-
+.withMetadata(a.metadata)
+.putLong(EventTimeWatermark.delayKey, delayMs)
+.build()
+  a.withMetadata(updatedMetadata)
+} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+  // Remove existing watermark
+  val updatedMetadata = new MetadataBuilder()
+.withMetadata(a.metadata)
+.remove(EventTimeWatermark.delayKey)
+.build()
   a.withMetadata(updatedMetadata)

spark git commit: [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 00859e148 -> 0c140c168


[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.

## What changes were proposed in this pull request?

A follow up to SPARK-19859:

- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark

## How was this patch tested?

Jenkins.

Author: uncleGen 

Closes #17221 from uncleGen/SPARK-19859.

(cherry picked from commit eeb1d6db878641d9eac62d0869a90fe80c1f4461)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0c140c16
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0c140c16
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0c140c16

Branch: refs/heads/branch-2.1
Commit: 0c140c1682262bc27df94952bda6ad8e3229fda4
Parents: 00859e1
Author: uncleGen 
Authored: Wed Mar 8 23:23:10 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 23:23:16 2017 -0800

--
 .../plans/logical/EventTimeWatermark.scala   |  9 -
 .../streaming/EventTimeWatermarkExec.scala   | 19 +++
 2 files changed, 19 insertions(+), 9 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0c140c16/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
index c919cdb..e0dd4c9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/EventTimeWatermark.scala
@@ -24,6 +24,12 @@ import org.apache.spark.unsafe.types.CalendarInterval
 object EventTimeWatermark {
   /** The [[org.apache.spark.sql.types.Metadata]] key used to hold the 
eventTime watermark delay. */
   val delayKey = "spark.watermarkDelayMs"
+
+  def getDelayMs(delay: CalendarInterval): Long = {
+// We define month as `31 days` to simplify calculation.
+val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
+delay.milliseconds + delay.months * millisPerMonth
+  }
 }
 
 /**
@@ -37,9 +43,10 @@ case class EventTimeWatermark(
   // Update the metadata on the eventTime column to include the desired delay.
   override val output: Seq[Attribute] = child.output.map { a =>
 if (a semanticEquals eventTime) {
+  val delayMs = EventTimeWatermark.getDelayMs(delay)
   val updatedMetadata = new MetadataBuilder()
 .withMetadata(a.metadata)
-.putLong(EventTimeWatermark.delayKey, delay.milliseconds)
+.putLong(EventTimeWatermark.delayKey, delayMs)
 .build()
   a.withMetadata(updatedMetadata)
 } else if (a.metadata.contains(EventTimeWatermark.delayKey)) {

http://git-wip-us.apache.org/repos/asf/spark/blob/0c140c16/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
index 5a9a99e..25cf609 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/EventTimeWatermarkExec.scala
@@ -84,10 +84,7 @@ case class EventTimeWatermarkExec(
 child: SparkPlan) extends SparkPlan {
 
   val eventTimeStats = new EventTimeStatsAccum()
-  val delayMs = {
-val millisPerMonth = CalendarInterval.MICROS_PER_DAY / 1000 * 31
-delay.milliseconds + delay.months * millisPerMonth
-  }
+  val delayMs = EventTimeWatermark.getDelayMs(delay)
 
   sparkContext.register(eventTimeStats)
 
@@ -105,10 +102,16 @@ case class EventTimeWatermarkExec(
   override val output: Seq[Attribute] = child.output.map { a =>
 if (a semanticEquals eventTime) {
   val updatedMetadata = new MetadataBuilder()
-  .withMetadata(a.metadata)
-  .putLong(EventTimeWatermark.delayKey, delayMs)
-  .build()
-
+.withMetadata(a.metadata)
+.putLong(EventTimeWatermark.delayKey, delayMs)
+.build()
+  a.withMetadata(updatedMetadata)
+} else if (a.metadata.contains(EventTimeWatermark.delayKey)) {
+  // Remove existing watermark
+  val updatedMetadata = new MetadataBuilder()
+

spark git commit: [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 78cc5721f -> 00859e148


[SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal

## What changes were proposed in this pull request?

The API docs should not include the "org.apache.spark.sql.internal" package 
because they are internal private APIs.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #17217 from zsxwing/SPARK-19874.

(cherry picked from commit 029e40b412e332c9f0fff283d604e203066c78c0)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/00859e14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/00859e14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/00859e14

Branch: refs/heads/branch-2.1
Commit: 00859e148fd1002fa314542953fee61a5d0fb9d9
Parents: 78cc572
Author: Shixiong Zhu 
Authored: Wed Mar 8 23:15:52 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 23:16:00 2017 -0800

--
 project/SparkBuild.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/00859e14/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index e3fbe03..e772fa0 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -699,6 +699,7 @@ object Unidoc {
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
+  
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test")))
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 09829be62 -> 029e40b41


[SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal

## What changes were proposed in this pull request?

The API docs should not include the "org.apache.spark.sql.internal" package 
because they are internal private APIs.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #17217 from zsxwing/SPARK-19874.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/029e40b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/029e40b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/029e40b4

Branch: refs/heads/master
Commit: 029e40b412e332c9f0fff283d604e203066c78c0
Parents: 09829be
Author: Shixiong Zhu 
Authored: Wed Mar 8 23:15:52 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 23:15:52 2017 -0800

--
 project/SparkBuild.scala | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/029e40b4/project/SparkBuild.scala
--
diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala
index 93a3189..e52baf5 100644
--- a/project/SparkBuild.scala
+++ b/project/SparkBuild.scala
@@ -655,6 +655,7 @@ object Unidoc {
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/util/collection")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/catalyst")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/execution")))
+  
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/internal")))
   
.map(_.filterNot(_.getCanonicalPath.contains("org/apache/spark/sql/hive/test")))
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master d809ceed9 -> 09829be62


[SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore

### What changes were proposed in this pull request?
So far, the test cases in DDLSuites only verify the behaviors of 
InMemoryCatalog. That means, they do not cover the scenarios using 
HiveExternalCatalog. Thus, we need to improve the existing test suite to run 
these cases using Hive metastore.

When porting these test cases, a bug of `SET LOCATION` is found. `path` is not 
set when the location is changed.

After this PR, a few changes are made, as summarized below,
- `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` and 
`HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using 
`InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
- `InMemoryCatalogedDDLSuite` contains all the existing test cases in 
`DDLSuite`.
- `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following test 
cases are excluded:

1. The following test cases only make sense for `InMemoryCatalog`:
```
  test("desc table for parquet data source table using in-memory catalog")
  test("create a managed Hive source table") {
  test("create an external Hive source table")
  test("Create Hive Table As Select")
```

2. The following test cases are unable to be ported because we are unable to 
alter table provider when using Hive metastore. In the future PRs we need to 
improve the test cases so that altering table provider is not needed:
```
  test("alter table: set location (datasource table)")
  test("alter table: set properties (datasource table)")
  test("alter table: unset properties (datasource table)")
  test("alter table: set serde (datasource table)")
  test("alter table: set serde partition (datasource table)")
  test("alter table: change column (datasource table)")
  test("alter table: add partition (datasource table)")
  test("alter table: drop partition (datasource table)")
  test("alter table: rename partition (datasource table)")
  test("drop table - data source table")
```

**TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the 
test cases to either `DDLSuite`,  `InMemoryCatalogedDDLSuite` or 
`HiveCatalogedDDLSuite`.

### How was this patch tested?
N/A

Author: Xiao Li 
Author: gatorsmile 

Closes #16592 from gatorsmile/refactorDDLSuite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/09829be6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/09829be6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/09829be6

Branch: refs/heads/master
Commit: 09829be621f0f9bb5076abb3d832925624699fa9
Parents: d809cee
Author: Xiao Li 
Authored: Wed Mar 8 23:12:10 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 23:12:10 2017 -0800

--
 .../spark/sql/execution/command/DDLSuite.scala  | 456 +++
 .../apache/spark/sql/test/SQLTestUtils.scala|   5 +
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 157 +++
 3 files changed, 345 insertions(+), 273 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/09829be6/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
index c1f8b2b..aa335c4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DDLSuite.scala
@@ -30,23 +30,164 @@ import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.CATALOG_IMPLEMENTATION
-import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
-class DDLSuite extends QueryTest with SharedSQLContext with BeforeAndAfterEach 
{
-  private val escapedIdentifier = "`(.+)`".r
 
+class InMemoryCatalogedDDLSuite extends DDLSuite with SharedSQLContext with 
BeforeAndAfterEach {
   override def afterEach(): Unit = {
 try {
   // drop all databases, tables and functions after each test
   spark.sessionState.catalog.reset()
 } finally {
-  Utils.deleteRecursively(new File("spark-warehouse"))
+  Utils.deleteRecursively(new File(spark.sessionState.conf.warehousePath))
   super.afterEach()
 }
   }
 
+  

spark git commit: [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.

2017-03-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 3457c3229 -> 78cc5721f


[MINOR][SQL] The analyzer rules are fired twice for cases when 
AnalysisException is raised from analyzer.

## What changes were proposed in this pull request?
In general we have a checkAnalysis phase which validates the logical plan and 
throws AnalysisException on semantic errors. However we also can throw 
AnalysisException from a few analyzer rules like ResolveSubquery.

I found that we fire up the analyzer rules twice for the queries that throw 
AnalysisException from one of the analyzer rules. This is a very minor fix. We 
don't have to strictly fix it. I just got confused seeing the rule getting 
fired two times when i was not expecting it.

## How was this patch tested?

Tested manually.

Author: Dilip Biswal 

Closes #17214 from dilipbiswal/analyis_twice.

(cherry picked from commit d809ceed9762d5bbb04170e45f38751713112dd8)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/78cc5721
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/78cc5721
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/78cc5721

Branch: refs/heads/branch-2.1
Commit: 78cc5721f07af5c561e89d1bbc72975bb67abb74
Parents: 3457c32
Author: Dilip Biswal 
Authored: Wed Mar 8 17:33:49 2017 -0800
Committer: Xiao Li 
Committed: Wed Mar 8 17:34:05 2017 -0800

--
 .../org/apache/spark/sql/execution/QueryExecution.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/78cc5721/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index b3ef29f..9b53d21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -45,9 +45,14 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   protected def planner = sparkSession.sessionState.planner
 
   def assertAnalyzed(): Unit = {
-try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
+// Analyzer is invoked outside the try block to avoid calling it again 
from within the
+// catch block below.
+analyzed
+try {
+  sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
+} catch {
   case e: AnalysisException =>
-val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Some(analyzed))
+val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
 ae.setStackTrace(e.getStackTrace)
 throw ae
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][SQL] The analyzer rules are fired twice for cases when AnalysisException is raised from analyzer.

2017-03-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master a3648b5d4 -> d809ceed9


[MINOR][SQL] The analyzer rules are fired twice for cases when 
AnalysisException is raised from analyzer.

## What changes were proposed in this pull request?
In general we have a checkAnalysis phase which validates the logical plan and 
throws AnalysisException on semantic errors. However we also can throw 
AnalysisException from a few analyzer rules like ResolveSubquery.

I found that we fire up the analyzer rules twice for the queries that throw 
AnalysisException from one of the analyzer rules. This is a very minor fix. We 
don't have to strictly fix it. I just got confused seeing the rule getting 
fired two times when i was not expecting it.

## How was this patch tested?

Tested manually.

Author: Dilip Biswal 

Closes #17214 from dilipbiswal/analyis_twice.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d809ceed
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d809ceed
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d809ceed

Branch: refs/heads/master
Commit: d809ceed9762d5bbb04170e45f38751713112dd8
Parents: a3648b5
Author: Dilip Biswal 
Authored: Wed Mar 8 17:33:49 2017 -0800
Committer: Xiao Li 
Committed: Wed Mar 8 17:33:49 2017 -0800

--
 .../org/apache/spark/sql/execution/QueryExecution.scala | 9 +++--
 1 file changed, 7 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d809ceed/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
index 6ec2f4d..9a3656d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/QueryExecution.scala
@@ -46,9 +46,14 @@ class QueryExecution(val sparkSession: SparkSession, val 
logical: LogicalPlan) {
   protected def planner = sparkSession.sessionState.planner
 
   def assertAnalyzed(): Unit = {
-try sparkSession.sessionState.analyzer.checkAnalysis(analyzed) catch {
+// Analyzer is invoked outside the try block to avoid calling it again 
from within the
+// catch block below.
+analyzed
+try {
+  sparkSession.sessionState.analyzer.checkAnalysis(analyzed)
+} catch {
   case e: AnalysisException =>
-val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Some(analyzed))
+val ae = new AnalysisException(e.message, e.line, e.startPosition, 
Option(analyzed))
 ae.setStackTrace(e.getStackTrace)
 throw ae
 }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: Revert "[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations for branch-2.1"

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 f6c1ad2eb -> 3457c3229


Revert "[SPARK-19413][SS] MapGroupsWithState for arbitrary stateful operations 
for branch-2.1"

This reverts commit 502c927b8c8a99ef2adf4e6e1d7a6d9232d45ef5.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3457c322
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3457c322
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3457c322

Branch: refs/heads/branch-2.1
Commit: 3457c32297e0150a4fbc80a30f84b9c62ca7c372
Parents: f6c1ad2
Author: Shixiong Zhu 
Authored: Wed Mar 8 14:30:54 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 14:41:29 2017 -0800

--
 .../analysis/UnsupportedOperationChecker.scala  |  11 +-
 .../sql/catalyst/plans/logical/object.scala |  49 ---
 .../analysis/UnsupportedOperationsSuite.scala   |  24 +-
 .../FlatMapGroupsWithStateFunction.java |  38 ---
 .../function/MapGroupsWithStateFunction.java|  38 ---
 .../spark/sql/KeyValueGroupedDataset.scala  | 113 ---
 .../scala/org/apache/spark/sql/KeyedState.scala | 142 
 .../spark/sql/execution/SparkStrategies.scala   |  21 +-
 .../apache/spark/sql/execution/objects.scala|  22 --
 .../streaming/IncrementalExecution.scala|  19 +-
 .../execution/streaming/KeyedStateImpl.scala|  80 -
 .../execution/streaming/ProgressReporter.scala  |   2 +-
 .../execution/streaming/StatefulAggregate.scala | 237 +
 .../state/HDFSBackedStateStoreProvider.scala|  19 --
 .../execution/streaming/state/StateStore.scala  |   5 -
 .../sql/execution/streaming/state/package.scala |  11 +-
 .../execution/streaming/statefulOperators.scala | 323 --
 .../org/apache/spark/sql/JavaDatasetSuite.java  |  32 --
 .../sql/streaming/MapGroupsWithStateSuite.scala | 335 ---
 19 files changed, 249 insertions(+), 1272 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3457c322/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
index d8aad42..f4d016c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/UnsupportedOperationChecker.scala
@@ -46,13 +46,8 @@ object UnsupportedOperationChecker {
 "Queries without streaming sources cannot be executed with 
writeStream.start()")(plan)
 }
 
-/** Collect all the streaming aggregates in a sub plan */
-def collectStreamingAggregates(subplan: LogicalPlan): Seq[Aggregate] = {
-  subplan.collect { case a: Aggregate if a.isStreaming => a }
-}
-
 // Disallow multiple streaming aggregations
-val aggregates = collectStreamingAggregates(plan)
+val aggregates = plan.collect { case a@Aggregate(_, _, _) if a.isStreaming 
=> a }
 
 if (aggregates.size > 1) {
   throwError(
@@ -119,10 +114,6 @@ object UnsupportedOperationChecker {
 case _: InsertIntoTable =>
   throwError("InsertIntoTable is not supported with streaming 
DataFrames/Datasets")
 
-case m: MapGroupsWithState if collectStreamingAggregates(m).nonEmpty =>
-  throwError("(map/flatMap)GroupsWithState is not supported after 
aggregation on a " +
-"streaming DataFrame/Dataset")
-
 case Join(left, right, joinType, _) =>
 
   joinType match {

http://git-wip-us.apache.org/repos/asf/spark/blob/3457c322/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
index 0be4823..0ab4c90 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/object.scala
@@ -313,55 +313,6 @@ case class MapGroups(
 outputObjAttr: Attribute,
 child: LogicalPlan) extends UnaryNode with ObjectProducer
 
-/** Internal class representing State */
-trait LogicalKeyedState[S]
-
-/** Factory for constructing new `MapGroupsWithState` nodes. */
-object MapGroupsWithState {
-  def apply[K: Encoder, V: Encoder, S: Encoder, U: Encoder](
-  func: (Any, Iterator[Any], LogicalKeyedState[Any]) 

spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

2017-03-08 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 455129020 -> a3648b5d4


[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in 
combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the 
files can be, relative the latest file that has been seen. This is used to 
limit the files that need to be remembered as "processed". Files older than the 
latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any 
unprocessed file older than the latest processed file. All files will be 
processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
not, then all old files get processed in the first batch, and so no file is 
left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
process the latest X files. That sets the threshold latest file - maxFileAge, 
so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz 

Closes #17153 from brkyvz/maxFileAge.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3648b5d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3648b5d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3648b5d

Branch: refs/heads/master
Commit: a3648b5d4f99ff9461d02f53e9ec71787a3abf51
Parents: 4551290
Author: Burak Yavuz 
Authored: Wed Mar 8 14:35:07 2017 -0800
Committer: Burak Yavuz 
Committed: Wed Mar 8 14:35:07 2017 -0800

--
 .../execution/streaming/FileStreamOptions.scala |  5 +-
 .../execution/streaming/FileStreamSource.scala  | 14 +++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 82 
 3 files changed, 63 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 2f802d7..e7ba901 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: 
CaseInsensitiveMap[String]) extends Logging
   }
 
   /**
-   * Maximum age of a file that can be found in this directory, before it is 
deleted.
+   * Maximum age of a file that can be found in this directory, before it is 
ignored. For the
+   * first batch all files will be considered valid. If `latestFirst` is set 
to `true` and
+   * `maxFilesPerTrigger` is set, then this parameter will be ignored, because 
old files that are
+   * valid, and should be processed, may be ignored. Please refer to 
SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest 
file, and not the
* timestamp of the current system. That this means if the last file has 
timestamp 1000, and the

http://git-wip-us.apache.org/repos/asf/spark/blob/a3648b5d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 6a7263c..0f09b0a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -66,23 +66,29 @@ class FileStreamSource(
 
   private val fileSortOrder = if (sourceOptions.latestFirst) {
   logWarning(
-"""'latestFirst' is true. New files will be processed first.
-  |It may affect the watermark value""".stripMargin)
+"""'latestFirst' is true. New files will be processed first, which may 
affect the watermark
+  |value. In addition, 

spark git commit: [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in combination with maxFileAge in FileStreamSource

2017-03-08 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 320eff14b -> f6c1ad2eb


[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in 
combination with maxFileAge in FileStreamSource

## What changes were proposed in this pull request?

**The Problem**
There is a file stream source option called maxFileAge which limits how old the 
files can be, relative the latest file that has been seen. This is used to 
limit the files that need to be remembered as "processed". Files older than the 
latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be any 
unprocessed file older than the latest processed file. All files will be 
processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
not, then all old files get processed in the first batch, and so no file is 
left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
process the latest X files. That sets the threshold latest file - maxFileAge, 
so files older than this threshold will never be considered for processing.
The bug is with case 3.

**The Solution**

Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are set.

## How was this patch tested?

Regression test in `FileStreamSourceSuite`

Author: Burak Yavuz 

Closes #17153 from brkyvz/maxFileAge.

(cherry picked from commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f6c1ad2e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f6c1ad2e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f6c1ad2e

Branch: refs/heads/branch-2.1
Commit: f6c1ad2eb6d0706899aabbdd39e558b3488e2ef3
Parents: 320eff1
Author: Burak Yavuz 
Authored: Wed Mar 8 14:35:07 2017 -0800
Committer: Burak Yavuz 
Committed: Wed Mar 8 14:35:22 2017 -0800

--
 .../execution/streaming/FileStreamOptions.scala |  5 +-
 .../execution/streaming/FileStreamSource.scala  | 14 +++-
 .../sql/streaming/FileStreamSourceSuite.scala   | 82 
 3 files changed, 63 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
index 25ebe17..fe64838 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamOptions.scala
@@ -38,7 +38,10 @@ class FileStreamOptions(parameters: CaseInsensitiveMap) 
extends Logging {
   }
 
   /**
-   * Maximum age of a file that can be found in this directory, before it is 
deleted.
+   * Maximum age of a file that can be found in this directory, before it is 
ignored. For the
+   * first batch all files will be considered valid. If `latestFirst` is set 
to `true` and
+   * `maxFilesPerTrigger` is set, then this parameter will be ignored, because 
old files that are
+   * valid, and should be processed, may be ignored. Please refer to 
SPARK-19813 for details.
*
* The max age is specified with respect to the timestamp of the latest 
file, and not the
* timestamp of the current system. That this means if the last file has 
timestamp 1000, and the

http://git-wip-us.apache.org/repos/asf/spark/blob/f6c1ad2e/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
index 39c0b49..0f0b6f1 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala
@@ -64,23 +64,29 @@ class FileStreamSource(
 
   private val fileSortOrder = if (sourceOptions.latestFirst) {
   logWarning(
-"""'latestFirst' is true. New files will be processed first.
-  |It may affect the watermark value""".stripMargin)
+

spark git commit: [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 6570cfd7a -> 455129020


[SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] storing CSV

## What changes were proposed in this pull request?

This PR proposes to add an API that loads `DataFrame` from `Dataset[String]` 
storing csv.

It allows pre-processing before loading into CSV, which means allowing a lot of 
workarounds for many narrow cases, for example, as below:

- Case 1 - pre-processing

  ```scala
  val df = spark.read.text("...")
  // Pre-processing with this.
  spark.read.csv(df.as[String])
  ```

- Case 2 - use other input formats

  ```scala
  val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
classOf[com.hadoop.mapreduce.LzoTextInputFormat],
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text])
  val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, 
pair._2.getLength))

  spark.read.csv(stringRdd.toDS)
  ```

## How was this patch tested?

Added tests in `CSVSuite` and build with Scala 2.10.

```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```

Author: hyukjinkwon 

Closes #16854 from HyukjinKwon/SPARK-15463.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/45512902
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/45512902
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/45512902

Branch: refs/heads/master
Commit: 455129020ca7f6a162f6f2486a87cc43512cfd2c
Parents: 6570cfd
Author: hyukjinkwon 
Authored: Wed Mar 8 13:43:09 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 13:43:09 2017 -0800

--
 .../org/apache/spark/sql/DataFrameReader.scala  | 71 +---
 .../datasources/csv/CSVDataSource.scala | 49 --
 .../execution/datasources/csv/CSVOptions.scala  |  2 +-
 .../datasources/csv/UnivocityParser.scala   |  2 +-
 .../execution/datasources/csv/CSVSuite.scala| 27 
 5 files changed, 121 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/45512902/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
index 41470ae..a5e38e2 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala
@@ -29,6 +29,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.json.{CreateJacksonParser, JacksonParser, 
JSONOptions}
 import org.apache.spark.sql.execution.LogicalRDD
 import org.apache.spark.sql.execution.command.DDLUtils
+import org.apache.spark.sql.execution.datasources.csv._
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.jdbc._
 import org.apache.spark.sql.execution.datasources.json.JsonInferSchema
@@ -368,14 +369,7 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
 createParser)
 }
 
-// Check a field requirement for corrupt records here to throw an 
exception in a driver side
-schema.getFieldIndex(parsedOptions.columnNameOfCorruptRecord).foreach { 
corruptFieldIndex =>
-  val f = schema(corruptFieldIndex)
-  if (f.dataType != StringType || !f.nullable) {
-throw new AnalysisException(
-  "The field for corrupt records must be string type and nullable")
-  }
-}
+verifyColumnNameOfCorruptRecord(schema, 
parsedOptions.columnNameOfCorruptRecord)
 
 val parsed = jsonDataset.rdd.mapPartitions { iter =>
   val parser = new JacksonParser(schema, parsedOptions)
@@ -399,6 +393,51 @@ class DataFrameReader private[sql](sparkSession: 
SparkSession) extends Logging {
   }
 
   /**
+   * Loads an `Dataset[String]` storing CSV rows and returns the result as a 
`DataFrame`.
+   *
+   * If the schema is not specified using `schema` function and `inferSchema` 
option is enabled,
+   * this function goes through the input once to determine the input schema.
+   *
+   * If the schema is not specified using `schema` function and `inferSchema` 
option is disabled,
+   * it determines the columns as string types and it reads only the first 
line to determine the
+   * names and the number of fields.
+   *
+   * @param csvDataset input Dataset with one CSV row per record
+   * @since 2.2.0
+   */
+  def csv(csvDataset: Dataset[String]): DataFrame = {
+val parsedOptions: CSVOptions = new CSVOptions(
+  extraOptions.toMap,
+  

spark git commit: [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow invalid cases

2017-03-08 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master e9e2c612d -> 1bf901238


[SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow 
invalid cases

## What changes were proposed in this pull request?

Add a output mode parameter to `flatMapGroupsWithState` and just define 
`mapGroupsWithState` as `flatMapGroupsWithState(Update)`.

`UnsupportedOperationChecker` is modified to disallow unsupported cases.

- Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
- For streaming (map/flatMap)GroupsWithState, see the following table:

| Operators  | Supported Query Output Mode |
| - | - |
| flatMapGroupsWithState(Update) without aggregation  | Update |
| flatMapGroupsWithState(Update) with aggregation  | None |
| flatMapGroupsWithState(Append) without aggregation  | Append |
| flatMapGroupsWithState(Append) before aggregation  | Append, Update, Complete 
|
| flatMapGroupsWithState(Append) after aggregation  | None |
| Multiple flatMapGroupsWithState(Append)s  | Append |
| Multiple mapGroupsWithStates  | None |
| Mxing mapGroupsWithStates  and flatMapGroupsWithStates | None |
| Other cases of multiple flatMapGroupsWithState | None |

## How was this patch tested?

The added unit tests. Here are the tests related to 
(map/flatMap)GroupsWithState:
```
[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Append) 
on batch relation: supported (1 millisecond)
[info] - batch plan - flatMapGroupsWithState - multiple 
flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - flatMapGroupsWithState(Update) 
on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - multiple 
flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
update mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
append mode: not supported (7 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in Append 
mode: not supported (11 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in Update 
mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in 
Complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation without aggregation in 
append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation without aggregation in 
update mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Update mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Complete mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation after aggregation in 
Append mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation after aggregation in 
Update mode: not supported (4 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation in complete mode: not 
supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on batch relation inside streaming relation in 
Append output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on batch relation inside streaming relation in 
Update output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on batch relation inside streaming relation in 
Append output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on batch relation inside streaming relation in 
Update output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - multiple 

spark git commit: [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has an identical copy of the SessionState

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 1bf901238 -> 6570cfd7a


[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session has 
an identical copy of the SessionState

Forking a newSession() from SparkSession currently makes a new SparkSession 
that does not retain SessionState (i.e. temporary tables, SQL config, 
registered functions etc.) This change adds a method cloneSession() which 
creates a new SparkSession with a copy of the parent's SessionState.

Subsequent changes to base session are not propagated to cloned session, clone 
is independent after creation.
If the base is changed after clone has been created, say user registers new 
UDF, then the new UDF will not be available inside the clone. Same goes for 
configs and temp tables.

Unit tests

Author: Kunal Khamar 
Author: Shixiong Zhu 

Closes #16826 from kunalkhamar/fork-sparksession.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6570cfd7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6570cfd7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6570cfd7

Branch: refs/heads/master
Commit: 6570cfd7abe349dc6d2151f2ac9dc662e7465a79
Parents: 1bf9012
Author: Kunal Khamar 
Authored: Wed Mar 8 13:06:22 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 13:20:45 2017 -0800

--
 .../spark/sql/catalyst/CatalystConf.scala   |   7 +-
 .../catalyst/analysis/FunctionRegistry.scala|   5 +-
 .../sql/catalyst/catalog/SessionCatalog.scala   |  38 ++-
 .../catalyst/catalog/SessionCatalogSuite.scala  |  55 
 .../apache/spark/sql/ExperimentalMethods.scala  |   6 +
 .../org/apache/spark/sql/SparkSession.scala |  59 -
 .../spark/sql/execution/datasources/rules.scala |   3 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |   8 +
 .../spark/sql/internal/SessionState.scala   | 235 +++--
 .../apache/spark/sql/SessionStateSuite.scala| 162 
 .../spark/sql/internal/CatalogSuite.scala   |  21 +-
 .../spark/sql/internal/SQLConfEntrySuite.scala  |  18 ++
 .../apache/spark/sql/test/TestSQLContext.scala  |  20 +-
 .../spark/sql/hive/HiveMetastoreCatalog.scala   |   5 +-
 .../spark/sql/hive/HiveSessionCatalog.scala |  92 +--
 .../spark/sql/hive/HiveSessionState.scala   | 261 ++-
 .../spark/sql/hive/client/HiveClientImpl.scala  |   2 +
 .../apache/spark/sql/hive/test/TestHive.scala   |  67 ++---
 .../sql/hive/HiveSessionCatalogSuite.scala  | 112 
 .../spark/sql/hive/HiveSessionStateSuite.scala  |  41 +++
 20 files changed, 981 insertions(+), 236 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6570cfd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index fb99cb2..cff0efa 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -66,6 +66,8 @@ trait CatalystConf {
 
   /** The maximum number of joined nodes allowed in the dynamic programming 
algorithm. */
   def joinReorderDPThreshold: Int
+
+  override def clone(): CatalystConf = throw new CloneNotSupportedException()
 }
 
 
@@ -85,4 +87,7 @@ case class SimpleCatalystConf(
 joinReorderDPThreshold: Int = 12,
 warehousePath: String = "/user/hive/warehouse",
 sessionLocalTimeZone: String = TimeZone.getDefault().getID)
-  extends CatalystConf
+  extends CatalystConf {
+
+  override def clone(): SimpleCatalystConf = this.copy()
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/6570cfd7/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 556fa99..0dcb440 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -64,6 +64,8 @@ trait FunctionRegistry {
   /** Clear all registered functions. */
   def clear(): Unit
 
+  /** Create a copy of this registry with identical functions as this 
registry. */
+  override def clone(): FunctionRegistry = throw new 
CloneNotSupportedException()
 }
 
 class SimpleFunctionRegistry extends 

spark git commit: [SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in Signaling.cancelOnInterrupt

2017-03-08 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 da3dfafa9 -> c561e6cfa


[SPARK-19481] [REPL] [MAVEN] Avoid to leak SparkContext in 
Signaling.cancelOnInterrupt

## What changes were proposed in this pull request?

`Signaling.cancelOnInterrupt` leaks a SparkContext per call and it makes 
ReplSuite unstable.

This PR adds `SparkContext.getActive` to allow `Signaling.cancelOnInterrupt` to 
get the active `SparkContext` to avoid the leak.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16825 from zsxwing/SPARK-19481.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c561e6cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c561e6cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c561e6cf

Branch: refs/heads/branch-2.0
Commit: c561e6cfaf8e67a58fa79a1d7284b779fee4e79f
Parents: da3dfaf
Author: Shixiong Zhu 
Authored: Thu Feb 9 11:16:51 2017 -0800
Committer: Shixiong Zhu 
Committed: Wed Mar 8 12:49:53 2017 -0800

--
 .../scala/org/apache/spark/SparkContext.scala   |  7 +++
 .../main/scala/org/apache/spark/repl/Main.scala |  1 +
 .../org/apache/spark/repl/SparkILoop.scala  |  1 -
 .../main/scala/org/apache/spark/repl/Main.scala |  2 +-
 .../scala/org/apache/spark/repl/Signaling.scala | 20 +++-
 5 files changed, 20 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/core/src/main/scala/org/apache/spark/SparkContext.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala 
b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2abe444..daef497 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -2295,6 +2295,13 @@ object SparkContext extends Logging {
 getOrCreate(new SparkConf())
   }
 
+  /** Return the current active [[SparkContext]] if any. */
+  private[spark] def getActive: Option[SparkContext] = {
+SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
+  Option(activeContext.get())
+}
+  }
+
   /**
* Called at the beginning of the SparkContext constructor to ensure that no 
SparkContext is
* running.  Throws an exception if a running context is detected and logs a 
warning if another

http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
--
diff --git a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
index 7b4e14b..fba321b 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/Main.scala
@@ -22,6 +22,7 @@ import org.apache.spark.internal.Logging
 object Main extends Logging {
 
   initializeLogIfNecessary(true)
+  Signaling.cancelOnInterrupt()
 
   private var _interp: SparkILoop = _
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
--
diff --git 
a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala 
b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
index e017aa4..b7237a6 100644
--- a/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
+++ b/repl/scala-2.10/src/main/scala/org/apache/spark/repl/SparkILoop.scala
@@ -1027,7 +1027,6 @@ class SparkILoop(
   builder.getOrCreate()
 }
 sparkContext = sparkSession.sparkContext
-Signaling.cancelOnInterrupt(sparkContext)
 sparkSession
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/c561e6cf/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
--
diff --git a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala 
b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
index 5dfe18a..13b772b 100644
--- a/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
+++ b/repl/scala-2.11/src/main/scala/org/apache/spark/repl/Main.scala
@@ -30,6 +30,7 @@ import org.apache.spark.util.Utils
 object Main extends Logging {
 
   initializeLogIfNecessary(true)
+  Signaling.cancelOnInterrupt()
 
   val conf = new SparkConf()
   val rootDir = 
conf.getOption("spark.repl.classdir").getOrElse(Utils.getLocalDir(conf))
@@ -108,7 +109,6 @@ object Main extends Logging {
   logInfo("Created Spark session")
 }
 sparkContext = sparkSession.sparkContext
-

spark git commit: [SPARK-19727][SQL] Fix for round function that modifies original column

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master f3387d974 -> e9e2c612d


[SPARK-19727][SQL] Fix for round function that modifies original column

## What changes were proposed in this pull request?

Fix for SQL round function that modifies original column when underlying data 
frame is created from a local product.

import org.apache.spark.sql.functions._

case class NumericRow(value: BigDecimal)

val df = spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"

df.show()
++
|   value|
++
|1.2345678900|
++

df.withColumn("value_rounded", round('value)).show()

// before
++-+
|   value|value_rounded|
++-+
|1.00|1|
++-+

// after
++-+
|   value|value_rounded|
++-+
|1.2345678900|1|
++-+

## How was this patch tested?

New unit test added to existing suite `org.apache.spark.sql.MathFunctionsSuite`

Author: Wojtek Szymanski 

Closes #17075 from wojtek-szymanski/SPARK-19727.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e9e2c612
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e9e2c612
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e9e2c612

Branch: refs/heads/master
Commit: e9e2c612d58a19ddcb4b6abfb7389a4b0f7ef6f8
Parents: f3387d9
Author: Wojtek Szymanski 
Authored: Wed Mar 8 12:36:16 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 12:36:16 2017 -0800

--
 .../sql/catalyst/CatalystTypeConverters.scala   |  6 +
 .../spark/sql/catalyst/expressions/Cast.scala   | 13 +++--
 .../expressions/decimalExpressions.scala| 10 ++-
 .../catalyst/expressions/mathExpressions.scala  |  2 +-
 .../org/apache/spark/sql/types/Decimal.scala| 28 ++--
 .../apache/spark/sql/types/DecimalSuite.scala   |  8 +-
 .../apache/spark/sql/MathFunctionsSuite.scala   | 12 +
 7 files changed, 54 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e9e2c612/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
index 5b91615..d4ebdb1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystTypeConverters.scala
@@ -310,11 +310,7 @@ object CatalystTypeConverters {
 case d: JavaBigInteger => Decimal(d)
 case d: Decimal => d
   }
-  if (decimal.changePrecision(dataType.precision, dataType.scale)) {
-decimal
-  } else {
-null
-  }
+  decimal.toPrecision(dataType.precision, dataType.scale).orNull
 }
 override def toScala(catalystValue: Decimal): JavaBigDecimal = {
   if (catalystValue == null) null

http://git-wip-us.apache.org/repos/asf/spark/blob/e9e2c612/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
index 7c60f7d..1049915 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Cast.scala
@@ -352,6 +352,15 @@ case class Cast(child: Expression, dataType: DataType, 
timeZoneId: Option[String
 if (value.changePrecision(decimalType.precision, decimalType.scale)) value 
else null
   }
 
+  /**
+   * Create new `Decimal` with precision and scale given in `decimalType` (if 
any),
+   * returning null if it overflows or creating a new `value` and returning it 
if successful.
+   *
+   */
+  private[this] def toPrecision(value: Decimal, decimalType: DecimalType): 
Decimal =
+value.toPrecision(decimalType.precision, decimalType.scale).orNull
+
+
   private[this] def castToDecimal(from: DataType, target: DecimalType): Any => 
Any = from match {
 case StringType =>
   buildCast[UTF8String](_, s => try {
@@ -360,14 +369,14 @@ case class Cast(child: Expression, 

spark git commit: [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`

2017-03-08 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 9a6ac7226 -> e420fd459


[SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`

## What changes were proposed in this pull request?

This is as per suggestion by rxin at : 
https://github.com/apache/spark/pull/17184#discussion_r104841735

## How was this patch tested?

NA as this is a documentation change

Author: Tejas Patil 

Closes #17205 from tejasapatil/SPARK-19843_followup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e420fd45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e420fd45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e420fd45

Branch: refs/heads/master
Commit: e420fd4592615d91cdcbca674ac58bcca6ab2ff3
Parents: 9a6ac72
Author: Tejas Patil 
Authored: Wed Mar 8 09:38:05 2017 -0800
Committer: Reynold Xin 
Committed: Wed Mar 8 09:38:05 2017 -0800

--
 .../apache/spark/unsafe/types/UTF8String.java| 19 +++
 1 file changed, 15 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e420fd45/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
--
diff --git 
a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java 
b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
index 7abe0fa..4c28075 100644
--- a/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
+++ b/common/unsafe/src/main/java/org/apache/spark/unsafe/types/UTF8String.java
@@ -850,11 +850,26 @@ public final class UTF8String implements 
Comparable, Externalizable,
 return fromString(sb.toString());
   }
 
+  /**
+   * Wrapper over `long` to allow result of parsing long from string to be 
accessed via reference.
+   * This is done solely for better performance and is not expected to be used 
by end users.
+   */
   public static class LongWrapper {
 public long value = 0;
   }
 
   /**
+   * Wrapper over `int` to allow result of parsing integer from string to be 
accessed via reference.
+   * This is done solely for better performance and is not expected to be used 
by end users.
+   *
+   * {@link LongWrapper} could have been used here but using `int` directly 
save the extra cost of
+   * conversion from `long` -> `int`
+   */
+  public static class IntWrapper {
+public int value = 0;
+  }
+
+  /**
* Parses this UTF8String to long.
*
* Note that, in this method we accumulate the result in negative format, 
and convert it to
@@ -942,10 +957,6 @@ public final class UTF8String implements 
Comparable, Externalizable,
 return true;
   }
 
-  public static class IntWrapper {
-public int value = 0;
-  }
-
   /**
* Parses this UTF8String to int.
*


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled Repartition

2017-03-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 5f7d835d3 -> 9a6ac7226


[SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled 
Repartition

### What changes were proposed in this pull request?

Observed by felixcheung  in https://github.com/apache/spark/pull/16739, when 
users use the shuffle-enabled `repartition` API, they expect the partition they 
got should be the exact number they provided, even if they call 
shuffle-disabled `coalesce` later.

Currently, `CollapseRepartition` rule does not consider whether shuffle is 
enabled or not. Thus, we got the following unexpected result.

```Scala
val df = spark.range(0, 1, 1, 5)
val df2 = df.repartition(10)
assert(df2.coalesce(13).rdd.getNumPartitions == 5)
assert(df2.coalesce(7).rdd.getNumPartitions == 5)
assert(df2.coalesce(3).rdd.getNumPartitions == 3)
```

This PR is to fix the issue. We preserve shuffle-enabled Repartition.

### How was this patch tested?
Added a test case

Author: Xiao Li 

Closes #16933 from gatorsmile/CollapseRepartition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9a6ac722
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9a6ac722
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9a6ac722

Branch: refs/heads/master
Commit: 9a6ac7226fd09d570cae08d0daea82d9bca189a0
Parents: 5f7d835
Author: Xiao Li 
Authored: Wed Mar 8 09:36:01 2017 -0800
Committer: Xiao Li 
Committed: Wed Mar 8 09:36:01 2017 -0800

--
 R/pkg/inst/tests/testthat/test_sparkSQL.R   |   4 +-
 .../apache/spark/sql/catalyst/dsl/package.scala |   3 +
 .../sql/catalyst/optimizer/Optimizer.scala  |  32 ++--
 .../plans/logical/basicLogicalOperators.scala   |  16 +-
 .../optimizer/CollapseRepartitionSuite.scala| 153 +--
 .../scala/org/apache/spark/sql/Dataset.scala|  10 +-
 .../spark/sql/execution/PlannerSuite.scala  |   9 +-
 7 files changed, 178 insertions(+), 49 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 620b633..9735fe3 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -2592,8 +2592,8 @@ test_that("coalesce, repartition, numPartitions", {
 
   df2 <- repartition(df1, 10)
   expect_equal(getNumPartitions(df2), 10)
-  expect_equal(getNumPartitions(coalesce(df2, 13)), 5)
-  expect_equal(getNumPartitions(coalesce(df2, 7)), 5)
+  expect_equal(getNumPartitions(coalesce(df2, 13)), 10)
+  expect_equal(getNumPartitions(coalesce(df2, 7)), 7)
   expect_equal(getNumPartitions(coalesce(df2, 3)), 3)
 })
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 0f0d904..35ca2a0 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -370,6 +370,9 @@ package object dsl {
 
   def as(alias: String): LogicalPlan = SubqueryAlias(alias, logicalPlan)
 
+  def coalesce(num: Integer): LogicalPlan =
+Repartition(num, shuffle = false, logicalPlan)
+
   def repartition(num: Integer): LogicalPlan =
 Repartition(num, shuffle = true, logicalPlan)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9a6ac722/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index d5bbc6e..caafa1c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -564,27 +564,23 @@ object CollapseProject extends Rule[LogicalPlan] {
 }
 
 /**
- * Combines adjacent [[Repartition]] and [[RepartitionByExpression]] operator 
combinations
- * by keeping only the one.
- * 1. For adjacent [[Repartition]]s, collapse into the last [[Repartition]].
- * 2. For adjacent [[RepartitionByExpression]]s, collapse into the last 
[[RepartitionByExpression]].
- * 3. For a combination of 

spark git commit: [SPARK-19865][SQL] remove the view identifier in SubqueryAlias

2017-03-08 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master e44274870 -> 5f7d835d3


[SPARK-19865][SQL] remove the view identifier in SubqueryAlias

## What changes were proposed in this pull request?

Since we have a `View` node now, we can remove the view identifier in 
`SubqueryAlias`, which was used to indicate a view node before.

## How was this patch tested?

Update the related test cases.

Author: jiangxingbo 

Closes #17210 from jiangxb1987/SubqueryAlias.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5f7d835d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5f7d835d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5f7d835d

Branch: refs/heads/master
Commit: 5f7d835d380c1a558a4a6d8366140cd96ee202eb
Parents: e442748
Author: jiangxingbo 
Authored: Wed Mar 8 16:18:17 2017 +0100
Committer: Herman van Hovell 
Committed: Wed Mar 8 16:18:17 2017 +0100

--
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  4 ++--
 .../spark/sql/catalyst/catalog/SessionCatalog.scala |  8 
 .../org/apache/spark/sql/catalyst/dsl/package.scala |  4 ++--
 .../spark/sql/catalyst/optimizer/subquery.scala |  8 
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  6 +++---
 .../plans/logical/basicLogicalOperators.scala   |  3 +--
 .../spark/sql/catalyst/analysis/AnalysisSuite.scala | 16 
 .../sql/catalyst/catalog/SessionCatalogSuite.scala  |  6 +++---
 .../sql/catalyst/optimizer/ColumnPruningSuite.scala |  8 
 .../optimizer/EliminateSubqueryAliasesSuite.scala   |  6 +++---
 .../catalyst/optimizer/JoinOptimizationSuite.scala  |  8 
 .../spark/sql/catalyst/parser/PlanParserSuite.scala |  2 +-
 .../main/scala/org/apache/spark/sql/Dataset.scala   |  2 +-
 .../sql/execution/joins/BroadcastJoinSuite.scala|  3 ---
 .../spark/sql/hive/HiveMetastoreCatalogSuite.scala  |  2 +-
 .../spark/sql/hive/execution/SQLQuerySuite.scala|  2 +-
 16 files changed, 42 insertions(+), 46 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5f7d835d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index ffa5aed..93666f1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -598,7 +598,7 @@ class Analyzer(
   execute(child)
 }
 view.copy(child = newChild)
-  case p @ SubqueryAlias(_, view: View, _) =>
+  case p @ SubqueryAlias(_, view: View) =>
 val newChild = resolveRelation(view)
 p.copy(child = newChild)
   case _ => plan
@@ -2363,7 +2363,7 @@ class Analyzer(
  */
 object EliminateSubqueryAliases extends Rule[LogicalPlan] {
   def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
-case SubqueryAlias(_, child, _) => child
+case SubqueryAlias(_, child) => child
   }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/5f7d835d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index 498bfbd..831e37a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -578,7 +578,7 @@ class SessionCatalog(
   val table = formatTableName(name.table)
   if (db == globalTempViewManager.database) {
 globalTempViewManager.get(table).map { viewDef =>
-  SubqueryAlias(table, viewDef, None)
+  SubqueryAlias(table, viewDef)
 }.getOrElse(throw new NoSuchTableException(db, table))
   } else if (name.database.isDefined || !tempTables.contains(table)) {
 val metadata = externalCatalog.getTable(db, table)
@@ -591,17 +591,17 @@ class SessionCatalog(
 desc = metadata,
 output = metadata.schema.toAttributes,
 child = parser.parsePlan(viewText))
-  SubqueryAlias(table, child, Some(name.copy(table = table, database = 
Some(db
+  SubqueryAlias(table, child)
 } else {
   val tableRelation = CatalogRelation(
 metadata,
 // we assume all the columns are 

spark git commit: [SPARK-17080][SQL] join reorder

2017-03-08 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 9ea201cf6 -> e44274870


[SPARK-17080][SQL] join reorder

## What changes were proposed in this pull request?

Reorder the joins using a dynamic programming algorithm (Selinger paper):
First we put all items (basic joined nodes) into level 1, then we build all 
two-way joins at level 2 from plans at level 1 (single items), then build all 
3-way joins from plans at previous levels (two-way joins and single items), 
then 4-way joins ... etc, until we build all n-way joins and pick the best plan 
among them.

When building m-way joins, we only keep the best plan (with the lowest cost) 
for the same set of m items. E.g., for 3-way joins, we keep only the best plan 
for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. Thus, 
the plans maintained for each level when reordering four items A, B, C, D are 
as follows:
```
level 1: p({A}), p({B}), p({C}), p({D})
level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
level 4: p({A, B, C, D})
```
where p({A, B, C, D}) is the final output plan.

For cost evaluation, since physical costs for operators are not available 
currently, we use cardinalities and sizes to compute costs.

## How was this patch tested?
add test cases

Author: wangzhenhua 
Author: Zhenhua Wang 

Closes #17138 from wzhfy/joinReorder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4427487
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4427487
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4427487

Branch: refs/heads/master
Commit: e44274870dee308f4e3e8ce79457d8d19693b6e5
Parents: 9ea201c
Author: wangzhenhua 
Authored: Wed Mar 8 16:01:28 2017 +0100
Committer: Herman van Hovell 
Committed: Wed Mar 8 16:01:28 2017 +0100

--
 .../spark/sql/catalyst/CatalystConf.scala   |   8 +
 .../optimizer/CostBasedJoinReorder.scala| 297 +++
 .../sql/catalyst/optimizer/Optimizer.scala  |   2 +
 .../catalyst/optimizer/JoinReorderSuite.scala   | 194 
 .../spark/sql/catalyst/plans/PlanTest.scala |   2 +-
 .../StatsEstimationTestBase.scala   |   4 +-
 .../org/apache/spark/sql/internal/SQLConf.scala |  16 +
 .../sql/execution/SparkSqlParserSuite.scala |   2 +-
 8 files changed, 521 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e4427487/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
index 5f50ce1..fb99cb2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/CatalystConf.scala
@@ -60,6 +60,12 @@ trait CatalystConf {
* Enables CBO for estimation of plan statistics when set true.
*/
   def cboEnabled: Boolean
+
+  /** Enables join reorder in CBO. */
+  def joinReorderEnabled: Boolean
+
+  /** The maximum number of joined nodes allowed in the dynamic programming 
algorithm. */
+  def joinReorderDPThreshold: Int
 }
 
 
@@ -75,6 +81,8 @@ case class SimpleCatalystConf(
 runSQLonFile: Boolean = true,
 crossJoinEnabled: Boolean = false,
 cboEnabled: Boolean = false,
+joinReorderEnabled: Boolean = false,
+joinReorderDPThreshold: Int = 12,
 warehousePath: String = "/user/hive/warehouse",
 sessionLocalTimeZone: String = TimeZone.getDefault().getID)
   extends CatalystConf

http://git-wip-us.apache.org/repos/asf/spark/blob/e4427487/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
new file mode 100644
index 000..b694561
--- /dev/null
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/CostBasedJoinReorder.scala
@@ -0,0 +1,297 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You 

spark git commit: [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case of exception

2017-03-08 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 3f9f9180c -> 9ea201cf6


[SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in case of 
exception

## What changes were proposed in this pull request?

Ensure broadcasted variable are destroyed even in case of exception
## How was this patch tested?

Word2VecSuite was run locally

Author: Anthony Truchet 

Closes #14299 from AnthonyTruchet/SPARK-16440.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9ea201cf
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9ea201cf
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9ea201cf

Branch: refs/heads/master
Commit: 9ea201cf6482c9c62c9428759d238063db62d66e
Parents: 3f9f918
Author: Anthony Truchet 
Authored: Wed Mar 8 11:44:25 2017 +
Committer: Sean Owen 
Committed: Wed Mar 8 11:44:25 2017 +

--
 .../org/apache/spark/mllib/feature/Word2Vec.scala | 18 +++---
 1 file changed, 15 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9ea201cf/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
index 2364d43..531c8b0 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala
@@ -30,6 +30,7 @@ import org.json4s.jackson.JsonMethods._
 import org.apache.spark.SparkContext
 import org.apache.spark.annotation.Since
 import org.apache.spark.api.java.JavaRDD
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.internal.Logging
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.mllib.util.{Loader, Saveable}
@@ -314,6 +315,20 @@ class Word2Vec extends Serializable with Logging {
 val expTable = sc.broadcast(createExpTable())
 val bcVocab = sc.broadcast(vocab)
 val bcVocabHash = sc.broadcast(vocabHash)
+try {
+  doFit(dataset, sc, expTable, bcVocab, bcVocabHash)
+} finally {
+  expTable.destroy(blocking = false)
+  bcVocab.destroy(blocking = false)
+  bcVocabHash.destroy(blocking = false)
+}
+  }
+
+  private def doFit[S <: Iterable[String]](
+dataset: RDD[S], sc: SparkContext,
+expTable: Broadcast[Array[Float]],
+bcVocab: Broadcast[Array[VocabWord]],
+bcVocabHash: Broadcast[mutable.HashMap[String, Int]]) = {
 // each partition is a collection of sentences,
 // will be translated into arrays of Index integer
 val sentences: RDD[Array[Int]] = dataset.mapPartitions { sentenceIter =>
@@ -435,9 +450,6 @@ class Word2Vec extends Serializable with Logging {
   bcSyn1Global.destroy(false)
 }
 newSentences.unpersist()
-expTable.destroy(false)
-bcVocab.destroy(false)
-bcVocabHash.destroy(false)
 
 val wordArray = vocab.map(_.word)
 new Word2VecModel(wordArray.zipWithIndex.toMap, syn0Global)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted to spark.sql.shuffle.partitions

2017-03-08 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 81303f7ca -> 3f9f9180c


[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically converted 
to spark.sql.shuffle.partitions

## What changes were proposed in this pull request?
Make the `SET mapreduce.job.reduces` automatically converted to 
`spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.

## How was this patch tested?

unit tests

Author: Yuming Wang 

Closes #17020 from wangyum/SPARK-19693.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3f9f9180
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3f9f9180
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3f9f9180

Branch: refs/heads/master
Commit: 3f9f9180c2e695ad468eb813df5feec41e169531
Parents: 81303f7
Author: Yuming Wang 
Authored: Wed Mar 8 11:31:01 2017 +
Committer: Sean Owen 
Committed: Wed Mar 8 11:31:01 2017 +

--
 .../spark/sql/execution/command/SetCommand.scala   | 17 +
 .../org/apache/spark/sql/internal/SQLConf.scala|  4 
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 12 
 3 files changed, 33 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
index 7afa4e7..5f12830 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/SetCommand.scala
@@ -60,6 +60,23 @@ case class SetCommand(kv: Option[(String, Option[String])]) 
extends RunnableComm
   }
   (keyValueOutput, runFunc)
 
+case Some((SQLConf.Replaced.MAPREDUCE_JOB_REDUCES, Some(value))) =>
+  val runFunc = (sparkSession: SparkSession) => {
+logWarning(
+  s"Property ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} is Hadoop's 
property, " +
+s"automatically converted to ${SQLConf.SHUFFLE_PARTITIONS.key} 
instead.")
+if (value.toInt < 1) {
+  val msg =
+s"Setting negative ${SQLConf.Replaced.MAPREDUCE_JOB_REDUCES} for 
automatically " +
+  "determining the number of reducers is not supported."
+  throw new IllegalArgumentException(msg)
+} else {
+  sparkSession.conf.set(SQLConf.SHUFFLE_PARTITIONS.key, value)
+  Seq(Row(SQLConf.SHUFFLE_PARTITIONS.key, value))
+}
+  }
+  (keyValueOutput, runFunc)
+
 case Some((key @ SetCommand.VariableName(name), Some(value))) =>
   val runFunc = (sparkSession: SparkSession) => {
 sparkSession.conf.set(name, value)

http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
index 461dfe3..fd3acd4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
@@ -677,6 +677,10 @@ object SQLConf {
   object Deprecated {
 val MAPRED_REDUCE_TASKS = "mapred.reduce.tasks"
   }
+
+  object Replaced {
+val MAPREDUCE_JOB_REDUCES = "mapreduce.job.reduces"
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/3f9f9180/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 468ea05..d9e0196 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1019,6 +1019,18 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 spark.sessionState.conf.clear()
   }
 
+  test("SET mapreduce.job.reduces automatically converted to 
spark.sql.shuffle.partitions") {
+spark.sessionState.conf.clear()
+val before = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
+val newConf = before + 1
+sql(s"SET mapreduce.job.reduces=${newConf.toString}")
+val after = spark.conf.get(SQLConf.SHUFFLE_PARTITIONS.key).toInt
+assert(before != after)
+assert(newConf === after)
+intercept[IllegalArgumentException](sql(s"SET 

spark git commit: [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports tweedie distribution.

2017-03-08 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 1fa58868b -> 81303f7ca


[SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports tweedie 
distribution.

## What changes were proposed in this pull request?
PySpark ```GeneralizedLinearRegression``` supports tweedie distribution.

## How was this patch tested?
Add unit tests.

Author: Yanbo Liang 

Closes #17146 from yanboliang/spark-19806.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81303f7c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81303f7c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81303f7c

Branch: refs/heads/master
Commit: 81303f7ca7808d51229411dce8feeed8c23dbe15
Parents: 1fa5886
Author: Yanbo Liang 
Authored: Wed Mar 8 02:09:36 2017 -0800
Committer: Yanbo Liang 
Committed: Wed Mar 8 02:09:36 2017 -0800

--
 .../GeneralizedLinearRegression.scala   |  8 +--
 python/pyspark/ml/regression.py | 61 +---
 python/pyspark/ml/tests.py  | 20 +++
 3 files changed, 77 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81303f7c/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
index 110764d..3be8b53 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/regression/GeneralizedLinearRegression.scala
@@ -66,7 +66,7 @@ private[regression] trait GeneralizedLinearRegressionBase 
extends PredictorParam
   /**
* Param for the power in the variance function of the Tweedie distribution 
which provides
* the relationship between the variance and mean of the distribution.
-   * Only applicable for the Tweedie family.
+   * Only applicable to the Tweedie family.
* (see https://en.wikipedia.org/wiki/Tweedie_distribution;>
* Tweedie Distribution (Wikipedia))
* Supported values: 0 and [1, Inf).
@@ -79,7 +79,7 @@ private[regression] trait GeneralizedLinearRegressionBase 
extends PredictorParam
   final val variancePower: DoubleParam = new DoubleParam(this, "variancePower",
 "The power in the variance function of the Tweedie distribution which 
characterizes " +
 "the relationship between the variance and mean of the distribution. " +
-"Only applicable for the Tweedie family. Supported values: 0 and [1, 
Inf).",
+"Only applicable to the Tweedie family. Supported values: 0 and [1, Inf).",
 (x: Double) => x >= 1.0 || x == 0.0)
 
   /** @group getParam */
@@ -106,7 +106,7 @@ private[regression] trait GeneralizedLinearRegressionBase 
extends PredictorParam
   def getLink: String = $(link)
 
   /**
-   * Param for the index in the power link function. Only applicable for the 
Tweedie family.
+   * Param for the index in the power link function. Only applicable to the 
Tweedie family.
* Note that link power 0, 1, -1 or 0.5 corresponds to the Log, Identity, 
Inverse or Sqrt
* link, respectively.
* When not set, this value defaults to 1 - [[variancePower]], which matches 
the R "statmod"
@@ -116,7 +116,7 @@ private[regression] trait GeneralizedLinearRegressionBase 
extends PredictorParam
*/
   @Since("2.2.0")
   final val linkPower: DoubleParam = new DoubleParam(this, "linkPower",
-"The index in the power link function. Only applicable for the Tweedie 
family.")
+"The index in the power link function. Only applicable to the Tweedie 
family.")
 
   /** @group getParam */
   @Since("2.2.0")

http://git-wip-us.apache.org/repos/asf/spark/blob/81303f7c/python/pyspark/ml/regression.py
--
diff --git a/python/pyspark/ml/regression.py b/python/pyspark/ml/regression.py
index b199bf2..3c3fcc8 100644
--- a/python/pyspark/ml/regression.py
+++ b/python/pyspark/ml/regression.py
@@ -1294,8 +1294,8 @@ class GeneralizedLinearRegression(JavaEstimator, 
HasLabelCol, HasFeaturesCol, Ha
 
 Fit a Generalized Linear Model specified by giving a symbolic description 
of the linear
 predictor (link function) and a description of the error distribution 
(family). It supports
-"gaussian", "binomial", "poisson" and "gamma" as family. Valid link 
functions for each family
-is listed below. The first link function of each family is the default one.
+"gaussian", "binomial", "poisson", "gamma" and "tweedie" as family. Valid 
link functions for
+each family is listed below. 

spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e69902806 -> da3dfafa9


[SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

Previously, we were using the mirror of passed in `TypeTag` when reflecting to 
build an encoder.  This fails when the outer class is built in (i.e. `Seq`'s 
default mirror is based on root classloader) but inner classes (i.e. `A` in 
`Seq[A]`) are defined in the REPL or a library.

This patch changes us to always reflect based on a mirror created using the 
context classloader.

Author: Michael Armbrust 

Closes #17201 from marmbrus/replSeqEncoder.

(cherry picked from commit 314e48a3584bad4b486b046bbf0159d64ba857bc)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/da3dfafa
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/da3dfafa
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/da3dfafa

Branch: refs/heads/branch-2.0
Commit: da3dfafa9725d7ff60831e11c4f77d21a0ae2204
Parents: e699028
Author: Michael Armbrust 
Authored: Wed Mar 8 01:32:42 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 01:34:25 2017 -0800

--
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++
 .../spark/sql/catalyst/encoders/ExpressionEncoder.scala  |  4 ++--
 2 files changed, 13 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/da3dfafa/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f7d7a4f..ad060dd 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("newProductSeqEncoder with REPL defined class") {
+val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+  """
+  |case class Click(id: Int)
+  |spark.implicits.newProductSeqEncoder[Click]
+""".stripMargin)
+
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/da3dfafa/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 1fac26c..eb4ca8e 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.Utils
 object ExpressionEncoder {
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
 // We convert the not-serializable TypeTag into StructType and ClassTag.
-val mirror = typeTag[T].mirror
-val tpe = typeTag[T].tpe
+val mirror = ScalaReflection.mirror
+val tpe = typeTag[T].in(mirror).tpe
 val cls = mirror.runtimeClass(tpe)
 val flat = !ScalaReflection.definedByConstructorParams(tpe)
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 0ba9ecbea -> 320eff14b


[SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

Previously, we were using the mirror of passed in `TypeTag` when reflecting to 
build an encoder.  This fails when the outer class is built in (i.e. `Seq`'s 
default mirror is based on root classloader) but inner classes (i.e. `A` in 
`Seq[A]`) are defined in the REPL or a library.

This patch changes us to always reflect based on a mirror created using the 
context classloader.

Author: Michael Armbrust 

Closes #17201 from marmbrus/replSeqEncoder.

(cherry picked from commit 314e48a3584bad4b486b046bbf0159d64ba857bc)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/320eff14
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/320eff14
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/320eff14

Branch: refs/heads/branch-2.1
Commit: 320eff14b0bb634eba2cdcae2303ba38fd0eb282
Parents: 0ba9ecb
Author: Michael Armbrust 
Authored: Wed Mar 8 01:32:42 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 01:32:51 2017 -0800

--
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++
 .../spark/sql/catalyst/encoders/ExpressionEncoder.scala  |  4 ++--
 2 files changed, 13 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/320eff14/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e93..5ef3987 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("newProductSeqEncoder with REPL defined class") {
+val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+  """
+  |case class Click(id: Int)
+  |spark.implicits.newProductSeqEncoder[Click]
+""".stripMargin)
+
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/320eff14/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 9c4818d..f7999a3 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.Utils
 object ExpressionEncoder {
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
 // We convert the not-serializable TypeTag into StructType and ClassTag.
-val mirror = typeTag[T].mirror
-val tpe = typeTag[T].tpe
+val mirror = ScalaReflection.mirror
+val tpe = typeTag[T].in(mirror).tpe
 
 if (ScalaReflection.optionOfProductType(tpe)) {
   throw new UnsupportedOperationException(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

2017-03-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 56e1bd337 -> 314e48a35


[SPARK-18055][SQL] Use correct mirror in ExpresionEncoder

Previously, we were using the mirror of passed in `TypeTag` when reflecting to 
build an encoder.  This fails when the outer class is built in (i.e. `Seq`'s 
default mirror is based on root classloader) but inner classes (i.e. `A` in 
`Seq[A]`) are defined in the REPL or a library.

This patch changes us to always reflect based on a mirror created using the 
context classloader.

Author: Michael Armbrust 

Closes #17201 from marmbrus/replSeqEncoder.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/314e48a3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/314e48a3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/314e48a3

Branch: refs/heads/master
Commit: 314e48a3584bad4b486b046bbf0159d64ba857bc
Parents: 56e1bd3
Author: Michael Armbrust 
Authored: Wed Mar 8 01:32:42 2017 -0800
Committer: Wenchen Fan 
Committed: Wed Mar 8 01:32:42 2017 -0800

--
 .../src/test/scala/org/apache/spark/repl/ReplSuite.scala | 11 +++
 .../spark/sql/catalyst/encoders/ExpressionEncoder.scala  |  4 ++--
 2 files changed, 13 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/314e48a3/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 55c9167..121a02a 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,15 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("newProductSeqEncoder with REPL defined class") {
+val output = runInterpreterInPasteMode("local-cluster[1,4,4096]",
+  """
+  |case class Click(id: Int)
+  |spark.implicits.newProductSeqEncoder[Click]
+""".stripMargin)
+
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/314e48a3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
index 0782143..93fc565 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/encoders/ExpressionEncoder.scala
@@ -45,8 +45,8 @@ import org.apache.spark.util.Utils
 object ExpressionEncoder {
   def apply[T : TypeTag](): ExpressionEncoder[T] = {
 // We convert the not-serializable TypeTag into StructType and ClassTag.
-val mirror = typeTag[T].mirror
-val tpe = typeTag[T].tpe
+val mirror = ScalaReflection.mirror
+val tpe = typeTag[T].in(mirror).tpe
 
 if (ScalaReflection.optionOfProductType(tpe)) {
   throw new UnsupportedOperationException(


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org