spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

2016-11-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 85dd07374 -> 4c4bf87ac


[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling 
StreamingQueryListenerBus.postToAll() which has no listener at allwe shall 
post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local 
listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat 

Closes #15675 from CodingCat/SPARK-18144.

(cherry picked from commit 85c5424d466f4a5765c825e0e2ab30da97611285)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4c4bf87a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4c4bf87a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4c4bf87a

Branch: refs/heads/branch-2.1
Commit: 4c4bf87acf2516a72b59f4e760413f80640dca1e
Parents: 85dd0737
Author: CodingCat 
Authored: Tue Nov 1 23:39:53 2016 -0700
Committer: Shixiong Zhu 
Committed: Tue Nov 1 23:40:00 2016 -0700

--
 .../execution/streaming/StreamingQueryListenerBus.scala   | 10 +-
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala  |  7 ++-
 2 files changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d..22e4c63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   def post(event: StreamingQueryListener.Event) {
 event match {
   case s: QueryStartedEvent =>
+sparkListenerBus.post(s)
+// post to local listeners to trigger callbacks
 postToAll(s)
   case _ =>
 sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
 event match {
   case e: StreamingQueryListener.Event =>
-postToAll(e)
+// SPARK-18144: we broadcast QueryStartedEvent to all listeners 
attached to this bus
+// synchronously and the ones attached to LiveListenerBus 
asynchronously. Therefore,
+// we need to ignore QueryStartedEvent if this method is called within 
SparkListenerBus
+// thread
+if (!LiveListenerBus.withinListenerThread.value || 
!e.isInstanceOf[QueryStartedEvent]) {
+  postToAll(e)
+}
   case _ =>
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4c4bf87a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443..31b7fe0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 // A StreamingQueryListener that gets the query status after the first 
completed trigger
 val listener = new StreamingQueryListener {
   @volatile var firstStatus: StreamingQueryStatus = null
-  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+  @volatile var queryStartedEvent = 0
+  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+queryStartedEvent += 1
+  }
   override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
   }
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
   q.processAllAvailable()
   eventually(timeout(streamingTimeou

spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

2016-11-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d401a74d4 -> 81f080425


[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling 
StreamingQueryListenerBus.postToAll() which has no listener at allwe shall 
post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local 
listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat 

Closes #15675 from CodingCat/SPARK-18144.

(cherry picked from commit 85c5424d466f4a5765c825e0e2ab30da97611285)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/81f08042
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/81f08042
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/81f08042

Branch: refs/heads/branch-2.0
Commit: 81f0804252bbfdc280e013e5f7b016759e66406e
Parents: d401a74
Author: CodingCat 
Authored: Tue Nov 1 23:39:53 2016 -0700
Committer: Shixiong Zhu 
Committed: Tue Nov 1 23:40:12 2016 -0700

--
 .../execution/streaming/StreamingQueryListenerBus.scala   | 10 +-
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala  |  7 ++-
 2 files changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/81f08042/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d..22e4c63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   def post(event: StreamingQueryListener.Event) {
 event match {
   case s: QueryStartedEvent =>
+sparkListenerBus.post(s)
+// post to local listeners to trigger callbacks
 postToAll(s)
   case _ =>
 sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
 event match {
   case e: StreamingQueryListener.Event =>
-postToAll(e)
+// SPARK-18144: we broadcast QueryStartedEvent to all listeners 
attached to this bus
+// synchronously and the ones attached to LiveListenerBus 
asynchronously. Therefore,
+// we need to ignore QueryStartedEvent if this method is called within 
SparkListenerBus
+// thread
+if (!LiveListenerBus.withinListenerThread.value || 
!e.isInstanceOf[QueryStartedEvent]) {
+  postToAll(e)
+}
   case _ =>
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/81f08042/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443..31b7fe0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 // A StreamingQueryListener that gets the query status after the first 
completed trigger
 val listener = new StreamingQueryListener {
   @volatile var firstStatus: StreamingQueryStatus = null
-  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+  @volatile var queryStartedEvent = 0
+  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+queryStartedEvent += 1
+  }
   override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
   }
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
   q.processAllAvailable()
   eventually(timeout(streamingTimeout

spark git commit: [SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

2016-11-01 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master a36653c5b -> 85c5424d4


[SPARK-18144][SQL] logging StreamingQueryListener$QueryStartedEvent

## What changes were proposed in this pull request?

The PR fixes the bug that the QueryStartedEvent is not logged

the postToAll() in the original code is actually calling 
StreamingQueryListenerBus.postToAll() which has no listener at allwe shall 
post by sparkListenerBus.postToAll(s) and this.postToAll() to trigger local 
listeners as well as the listeners registered in LiveListenerBus

zsxwing
## How was this patch tested?

The following snapshot shows that QueryStartedEvent has been logged correctly

![image](https://cloud.githubusercontent.com/assets/678008/19821553/007a7d28-9d2d-11e6-9f13-49851559cdaa.png)

Author: CodingCat 

Closes #15675 from CodingCat/SPARK-18144.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85c5424d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85c5424d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85c5424d

Branch: refs/heads/master
Commit: 85c5424d466f4a5765c825e0e2ab30da97611285
Parents: a36653c
Author: CodingCat 
Authored: Tue Nov 1 23:39:53 2016 -0700
Committer: Shixiong Zhu 
Committed: Tue Nov 1 23:39:53 2016 -0700

--
 .../execution/streaming/StreamingQueryListenerBus.scala   | 10 +-
 .../apache/spark/sql/streaming/StreamingQuerySuite.scala  |  7 ++-
 2 files changed, 15 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
index fc2190d..22e4c63 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingQueryListenerBus.scala
@@ -41,6 +41,8 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   def post(event: StreamingQueryListener.Event) {
 event match {
   case s: QueryStartedEvent =>
+sparkListenerBus.post(s)
+// post to local listeners to trigger callbacks
 postToAll(s)
   case _ =>
 sparkListenerBus.post(event)
@@ -50,7 +52,13 @@ class StreamingQueryListenerBus(sparkListenerBus: 
LiveListenerBus)
   override def onOtherEvent(event: SparkListenerEvent): Unit = {
 event match {
   case e: StreamingQueryListener.Event =>
-postToAll(e)
+// SPARK-18144: we broadcast QueryStartedEvent to all listeners 
attached to this bus
+// synchronously and the ones attached to LiveListenerBus 
asynchronously. Therefore,
+// we need to ignore QueryStartedEvent if this method is called within 
SparkListenerBus
+// thread
+if (!LiveListenerBus.withinListenerThread.value || 
!e.isInstanceOf[QueryStartedEvent]) {
+  postToAll(e)
+}
   case _ =>
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/85c5424d/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 464c443..31b7fe0 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -290,7 +290,10 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
 // A StreamingQueryListener that gets the query status after the first 
completed trigger
 val listener = new StreamingQueryListener {
   @volatile var firstStatus: StreamingQueryStatus = null
-  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = { }
+  @volatile var queryStartedEvent = 0
+  override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
+queryStartedEvent += 1
+  }
   override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
if (firstStatus == null) firstStatus = queryProgress.queryStatus
   }
@@ -303,6 +306,8 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging {
   q.processAllAvailable()
   eventually(timeout(streamingTimeout)) {
 assert(listener.firstStatus != null)
+// test if QueryStartedEvent callback is called

spark git commit: [SPARK-18192] Support all file formats in structured streaming

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 e6509c245 -> 85dd07374


[SPARK-18192] Support all file formats in structured streaming

## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. 
This is actually a very small change thanks to all the previous refactoring 
done using the new internal commit protocol API.

## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.

Author: Reynold Xin 

Closes #15711 from rxin/SPARK-18192.

(cherry picked from commit a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85dd0737
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85dd0737
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85dd0737

Branch: refs/heads/branch-2.1
Commit: 85dd073743946383438aabb9f1281e6075f25cc5
Parents: e6509c2
Author: Reynold Xin 
Authored: Tue Nov 1 23:37:03 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 23:37:11 2016 -0700

--
 .../sql/execution/datasources/DataSource.scala  |  8 +--
 .../sql/streaming/FileStreamSinkSuite.scala | 62 +---
 2 files changed, 32 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d980e6a..3f956c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -37,7 +36,6 @@ import 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -292,7 +290,7 @@ case class DataSource(
   case s: StreamSinkProvider =>
 s.createSink(sparkSession.sqlContext, options, partitionColumns, 
outputMode)
 
-  case parquet: parquet.ParquetFileFormat =>
+  case fileFormat: FileFormat =>
 val caseInsensitiveOptions = new CaseInsensitiveMap(options)
 val path = caseInsensitiveOptions.getOrElse("path", {
   throw new IllegalArgumentException("'path' is not specified")
@@ -301,7 +299,7 @@ case class DataSource(
   throw new IllegalArgumentException(
 s"Data source $className does not support $outputMode output mode")
 }
-new FileStreamSink(sparkSession, path, parquet, partitionColumns, 
options)
+new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, 
options)
 
   case _ =>
 throw new UnsupportedOperationException(
@@ -516,7 +514,7 @@ case class DataSource(
   val plan = data.logicalPlan
   plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
 throw new AnalysisException(
-  s"Unable to resolve ${name} given 
[${plan.output.map(_.name).mkString(", ")}]")
+  s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
   }.asInstanceOf[Attribute]
 }
 // For partitioned relation r, r.schema's column ordering can be 
different from the column

http://git-wip-us.apache.org/repos/asf/spark/blob/85dd0737/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05..0f140f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b

spark git commit: [SPARK-18192] Support all file formats in structured streaming

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master abefe2ec4 -> a36653c5b


[SPARK-18192] Support all file formats in structured streaming

## What changes were proposed in this pull request?
This patch adds support for all file formats in structured streaming sinks. 
This is actually a very small change thanks to all the previous refactoring 
done using the new internal commit protocol API.

## How was this patch tested?
Updated FileStreamSinkSuite to add test cases for json, text, and parquet.

Author: Reynold Xin 

Closes #15711 from rxin/SPARK-18192.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a36653c5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a36653c5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a36653c5

Branch: refs/heads/master
Commit: a36653c5b7b2719f8bfddf4ddfc6e1b828ac9af1
Parents: abefe2e
Author: Reynold Xin 
Authored: Tue Nov 1 23:37:03 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 23:37:03 2016 -0700

--
 .../sql/execution/datasources/DataSource.scala  |  8 +--
 .../sql/streaming/FileStreamSinkSuite.scala | 62 +---
 2 files changed, 32 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a36653c5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index d980e6a..3f956c4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -29,7 +29,6 @@ import org.apache.hadoop.fs.Path
 import org.apache.spark.deploy.SparkHadoopUtil
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
 import org.apache.spark.sql.catalyst.catalog.{BucketSpec, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
@@ -37,7 +36,6 @@ import 
org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources._
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.{CalendarIntervalType, StructType}
@@ -292,7 +290,7 @@ case class DataSource(
   case s: StreamSinkProvider =>
 s.createSink(sparkSession.sqlContext, options, partitionColumns, 
outputMode)
 
-  case parquet: parquet.ParquetFileFormat =>
+  case fileFormat: FileFormat =>
 val caseInsensitiveOptions = new CaseInsensitiveMap(options)
 val path = caseInsensitiveOptions.getOrElse("path", {
   throw new IllegalArgumentException("'path' is not specified")
@@ -301,7 +299,7 @@ case class DataSource(
   throw new IllegalArgumentException(
 s"Data source $className does not support $outputMode output mode")
 }
-new FileStreamSink(sparkSession, path, parquet, partitionColumns, 
options)
+new FileStreamSink(sparkSession, path, fileFormat, partitionColumns, 
options)
 
   case _ =>
 throw new UnsupportedOperationException(
@@ -516,7 +514,7 @@ case class DataSource(
   val plan = data.logicalPlan
   plan.resolve(name :: Nil, 
data.sparkSession.sessionState.analyzer.resolver).getOrElse {
 throw new AnalysisException(
-  s"Unable to resolve ${name} given 
[${plan.output.map(_.name).mkString(", ")}]")
+  s"Unable to resolve $name given 
[${plan.output.map(_.name).mkString(", ")}]")
   }.asInstanceOf[Attribute]
 }
 // For partitioned relation r, r.schema's column ordering can be 
different from the column

http://git-wip-us.apache.org/repos/asf/spark/blob/a36653c5/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
index 902cf05..0f140f9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSinkSuite.scala
@@ -17,7 +17,7 @@
 
 pac

spark git commit: [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 39d2fdb51 -> e6509c245


[SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for 
Datasource tables

There are a couple issues with the current 2.1 behavior when inserting into 
Datasource tables with partitions managed by Hive.

(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table 
instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.

This PR fixes both of these issues for Datasource tables managed by Hive. The 
behavior for legacy tables or when `manageFilesourcePartitions = false` is 
unchanged.

There is one other issue in that INSERT OVERWRITE with dynamic partitions will 
overwrite the entire table instead of just the updated partitions, but this 
behavior is pretty complicated to implement for Datasource tables. We should 
address that in a future release.

Unit tests.

Author: Eric Liang 

Closes #15705 from ericl/sc-4942.

(cherry picked from commit abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e6509c24
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e6509c24
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e6509c24

Branch: refs/heads/branch-2.1
Commit: e6509c2459e7ece3c3c6bcd143b8cc71f8f4d5c8
Parents: 39d2fdb
Author: Eric Liang 
Authored: Wed Nov 2 14:15:10 2016 +0800
Committer: Reynold Xin 
Committed: Tue Nov 1 23:23:55 2016 -0700

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  9 +++-
 .../plans/logical/basicLogicalOperators.scala   | 19 ++-
 .../sql/catalyst/parser/PlanParserSuite.scala   | 15 --
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../datasources/CatalogFileIndex.scala  |  5 +-
 .../datasources/DataSourceStrategy.scala| 30 +--
 .../InsertIntoDataSourceCommand.scala   |  6 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  3 +-
 .../CreateHiveTableAsSelectCommand.scala|  5 +-
 .../PartitionProviderCompatibilitySuite.scala   | 52 
 11 files changed, 129 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e6509c24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca..e901683 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
   def insertInto(tableName: String, overwrite: Boolean = false): 
LogicalPlan =
 InsertIntoTable(
   analysis.UnresolvedRelation(TableIdentifier(tableName)),
-  Map.empty, logicalPlan, overwrite, false)
+  Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
 
   def as(alias: String): LogicalPlan = logicalPlan match {
 case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, 
Option(alias))

http://git-wip-us.apache.org/repos/asf/spark/blob/e6509c24/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6..ac1577b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
   throw new ParseException(s"Dynamic partitions do not support IF NOT 
EXISTS. Specified " +
 "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", 
",", "]"), ctx)
 }
+val overwrite = ctx.OVERWRITE != null
+val overwritePartition =
+  if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) 
{
+Some(partitionKeys.map(t => (t._1, t._2.get)))
+  } else {
+None
+  }
 
 InsertIntoTable(
   UnresolvedRelation(tableIdent, None),
   partitionKeys,
   query,
-  ctx.OVERWRITE != null,
+  OverwriteOptions(overwrite, overwritePartition),
   ctx.EXISTS != null)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/e6509c24/sql/catalyst/src/main/scala/org/apache/spark/s

spark git commit: [SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for Datasource tables

2016-11-01 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 620da3b48 -> abefe2ec4


[SPARK-18183][SPARK-18184] Fix INSERT [INTO|OVERWRITE] TABLE ... PARTITION for 
Datasource tables

## What changes were proposed in this pull request?

There are a couple issues with the current 2.1 behavior when inserting into 
Datasource tables with partitions managed by Hive.

(1) OVERWRITE TABLE ... PARTITION will actually overwrite the entire table 
instead of just the specified partition.
(2) INSERT|OVERWRITE does not work with partitions that have custom locations.

This PR fixes both of these issues for Datasource tables managed by Hive. The 
behavior for legacy tables or when `manageFilesourcePartitions = false` is 
unchanged.

There is one other issue in that INSERT OVERWRITE with dynamic partitions will 
overwrite the entire table instead of just the updated partitions, but this 
behavior is pretty complicated to implement for Datasource tables. We should 
address that in a future release.

## How was this patch tested?

Unit tests.

Author: Eric Liang 

Closes #15705 from ericl/sc-4942.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/abefe2ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/abefe2ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/abefe2ec

Branch: refs/heads/master
Commit: abefe2ec428dc24a4112c623fb6fbe4b2ca60a2b
Parents: 620da3b
Author: Eric Liang 
Authored: Wed Nov 2 14:15:10 2016 +0800
Committer: Wenchen Fan 
Committed: Wed Nov 2 14:15:10 2016 +0800

--
 .../apache/spark/sql/catalyst/dsl/package.scala |  2 +-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |  9 +++-
 .../plans/logical/basicLogicalOperators.scala   | 19 ++-
 .../sql/catalyst/parser/PlanParserSuite.scala   | 15 --
 .../org/apache/spark/sql/DataFrameWriter.scala  |  4 +-
 .../datasources/CatalogFileIndex.scala  |  5 +-
 .../datasources/DataSourceStrategy.scala| 30 +--
 .../InsertIntoDataSourceCommand.scala   |  6 +--
 .../apache/spark/sql/hive/HiveStrategies.scala  |  3 +-
 .../CreateHiveTableAsSelectCommand.scala|  5 +-
 .../PartitionProviderCompatibilitySuite.scala   | 52 
 11 files changed, 129 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
index 66e52ca..e901683 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/dsl/package.scala
@@ -367,7 +367,7 @@ package object dsl {
   def insertInto(tableName: String, overwrite: Boolean = false): 
LogicalPlan =
 InsertIntoTable(
   analysis.UnresolvedRelation(TableIdentifier(tableName)),
-  Map.empty, logicalPlan, overwrite, false)
+  Map.empty, logicalPlan, OverwriteOptions(overwrite), false)
 
   def as(alias: String): LogicalPlan = logicalPlan match {
 case UnresolvedRelation(tbl, _) => UnresolvedRelation(tbl, 
Option(alias))

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 38e9bb6..ac1577b 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -177,12 +177,19 @@ class AstBuilder extends SqlBaseBaseVisitor[AnyRef] with 
Logging {
   throw new ParseException(s"Dynamic partitions do not support IF NOT 
EXISTS. Specified " +
 "partitions with value: " + dynamicPartitionKeys.keys.mkString("[", 
",", "]"), ctx)
 }
+val overwrite = ctx.OVERWRITE != null
+val overwritePartition =
+  if (overwrite && partitionKeys.nonEmpty && dynamicPartitionKeys.isEmpty) 
{
+Some(partitionKeys.map(t => (t._1, t._2.get)))
+  } else {
+None
+  }
 
 InsertIntoTable(
   UnresolvedRelation(tableIdent, None),
   partitionKeys,
   query,
-  ctx.OVERWRITE != null,
+  OverwriteOptions(overwrite, overwritePartition),
   ctx.EXISTS != null)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/abefe2ec/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logi

spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 1bbf9ff63 -> 39d2fdb51


[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use 
checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on 
non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves 
hidden HDFS-style checksum (CRC) files in the log directory, one file per 
batch. This PR modifies HDFSMetadataLog so that it detects the use of a 
filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether 
HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran 
the entire regression suite.

Author: frreiss 

Closes #15027 from frreiss/fred-17475.

(cherry picked from commit 620da3b4828b3580c7ed7339b2a07938e6be1bb1)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/39d2fdb5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/39d2fdb5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/39d2fdb5

Branch: refs/heads/branch-2.1
Commit: 39d2fdb51233ed9b1aaf3adaa3267853f5e58c0f
Parents: 1bbf9ff
Author: frreiss 
Authored: Tue Nov 1 23:00:17 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 23:00:28 2016 -0700

--
 .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 +
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala   | 6 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320..9a0f87c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
   // It will fail if there is an existing file (someone has committed 
the batch)
   logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
   fileManager.rename(tempPath, batchIdToPath(batchId))
+
+  // SPARK-17475: HDFSMetadataLog should not leak CRC files
+  // If the underlying filesystem didn't rename the CRC file, delete 
it.
+  val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
+  if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
   return
 } catch {
   case e: IOException if isFileAlreadyExistsException(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/39d2fdb5/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26d..d03e08d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
   assert(metadataLog.get(1).isEmpty)
   assert(metadataLog.get(2).isDefined)
   assert(metadataLog.getLatest().get._1 == 2)
+
+  // There should be exactly one file, called "2", in the metadata 
directory.
+  // This check also tests for regressions of SPARK-17475
+  val allFiles = new 
File(metadataLog.metadataPath.toString).listFiles().toSeq
+  assert(allFiles.size == 1)
+  assert(allFiles(0).getName() == "2")
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use checksum files

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 1bbf9ff63 -> 620da3b48


[SPARK-17475][STREAMING] Delete CRC files if the filesystem doesn't use 
checksum files

## What changes were proposed in this pull request?

When the metadata logs for various parts of Structured Streaming are stored on 
non-HDFS filesystems such as NFS or ext4, the HDFSMetadataLog class leaves 
hidden HDFS-style checksum (CRC) files in the log directory, one file per 
batch. This PR modifies HDFSMetadataLog so that it detects the use of a 
filesystem that doesn't use CRC files and removes the CRC files.
## How was this patch tested?

Modified an existing test case in HDFSMetadataLogSuite to check whether 
HDFSMetadataLog correctly removes CRC files on the local POSIX filesystem.  Ran 
the entire regression suite.

Author: frreiss 

Closes #15027 from frreiss/fred-17475.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/620da3b4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/620da3b4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/620da3b4

Branch: refs/heads/master
Commit: 620da3b4828b3580c7ed7339b2a07938e6be1bb1
Parents: 1bbf9ff
Author: frreiss 
Authored: Tue Nov 1 23:00:17 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 23:00:17 2016 -0700

--
 .../apache/spark/sql/execution/streaming/HDFSMetadataLog.scala | 5 +
 .../spark/sql/execution/streaming/HDFSMetadataLogSuite.scala   | 6 ++
 2 files changed, 11 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
index c7235320..9a0f87c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLog.scala
@@ -148,6 +148,11 @@ class HDFSMetadataLog[T: ClassTag](sparkSession: 
SparkSession, path: String)
   // It will fail if there is an existing file (someone has committed 
the batch)
   logDebug(s"Attempting to write log #${batchIdToPath(batchId)}")
   fileManager.rename(tempPath, batchIdToPath(batchId))
+
+  // SPARK-17475: HDFSMetadataLog should not leak CRC files
+  // If the underlying filesystem didn't rename the CRC file, delete 
it.
+  val crcPath = new Path(tempPath.getParent(), 
s".${tempPath.getName()}.crc")
+  if (fileManager.exists(crcPath)) fileManager.delete(crcPath)
   return
 } catch {
   case e: IOException if isFileAlreadyExistsException(e) =>

http://git-wip-us.apache.org/repos/asf/spark/blob/620da3b4/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
index 9c1d26d..d03e08d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/HDFSMetadataLogSuite.scala
@@ -119,6 +119,12 @@ class HDFSMetadataLogSuite extends SparkFunSuite with 
SharedSQLContext {
   assert(metadataLog.get(1).isEmpty)
   assert(metadataLog.get(2).isDefined)
   assert(metadataLog.getLatest().get._1 == 2)
+
+  // There should be exactly one file, called "2", in the metadata 
directory.
+  // This check also tests for regressions of SPARK-17475
+  val allFiles = new 
File(metadataLog.metadataPath.toString).listFiles().toSeq
+  assert(allFiles.size == 1)
+  assert(allFiles(0).getName() == "2")
 }
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



[spark] Git Push Summary

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 [created] 1bbf9ff63

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a metastore exception when attempting to fetch partitions by filter

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 1ecfafa08 -> 1bbf9ff63


[SPARK-17992][SQL] Return all partitions from HiveShim when Hive throws a 
metastore exception when attempting to fetch partitions by filter

(Link to Jira issue: https://issues.apache.org/jira/browse/SPARK-17992)
## What changes were proposed in this pull request?

We recently added table partition pruning for partitioned Hive tables converted 
to using `TableFileCatalog`. When the Hive configuration option 
`hive.metastore.try.direct.sql` is set to `false`, Hive will throw an exception 
for unsupported filter expressions. For example, attempting to filter on an 
integer partition column will throw a 
`org.apache.hadoop.hive.metastore.api.MetaException`.

I discovered this behavior because VideoAmp uses the CDH version of Hive with a 
Postgresql metastore DB. In this configuration, CDH sets 
`hive.metastore.try.direct.sql` to `false` by default, and queries that filter 
on a non-string partition column will fail.

Rather than throw an exception in query planning, this patch catches this 
exception, logs a warning and returns all table partitions instead. Clients of 
this method are already expected to handle the possibility that the filters 
will not be honored.
## How was this patch tested?

A unit test was added.

Author: Michael Allman 

Closes #15673 from mallman/spark-17992-catch_hive_partition_filter_exception.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1bbf9ff6
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1bbf9ff6
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1bbf9ff6

Branch: refs/heads/master
Commit: 1bbf9ff634745148e782370009aa31d3a042638c
Parents: 1ecfafa
Author: Michael Allman 
Authored: Tue Nov 1 22:20:19 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 22:20:19 2016 -0700

--
 .../apache/spark/sql/hive/client/HiveShim.scala | 31 ++--
 .../sql/hive/client/HiveClientBuilder.scala | 56 ++
 .../spark/sql/hive/client/HiveClientSuite.scala | 61 
 .../spark/sql/hive/client/VersionsSuite.scala   | 77 +---
 4 files changed, 160 insertions(+), 65 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1bbf9ff6/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
index 85edaf6..3d9642d 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala
@@ -29,7 +29,7 @@ import scala.util.control.NonFatal
 
 import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, 
FunctionType, NoSuchObjectException, PrincipalType, ResourceType, ResourceUri}
+import org.apache.hadoop.hive.metastore.api.{Function => HiveFunction, 
FunctionType, MetaException, PrincipalType, ResourceType, ResourceUri}
 import org.apache.hadoop.hive.ql.Driver
 import org.apache.hadoop.hive.ql.metadata.{Hive, HiveException, Partition, 
Table}
 import org.apache.hadoop.hive.ql.plan.AddPartitionDesc
@@ -43,6 +43,7 @@ import org.apache.spark.sql.catalyst.FunctionIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPermanentFunctionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogFunction, 
CatalogTablePartition, FunctionResource, FunctionResourceType}
 import org.apache.spark.sql.catalyst.expressions._
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.{IntegralType, StringType}
 import org.apache.spark.util.Utils
 
@@ -586,17 +587,31 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 getAllPartitionsMethod.invoke(hive, 
table).asInstanceOf[JSet[Partition]]
   } else {
 logDebug(s"Hive metastore filter is '$filter'.")
+val tryDirectSqlConfVar = HiveConf.ConfVars.METASTORE_TRY_DIRECT_SQL
+val tryDirectSql =
+  hive.getConf.getBoolean(tryDirectSqlConfVar.varname, 
tryDirectSqlConfVar.defaultBoolVal)
 try {
+  // Hive may throw an exception when calling this method in some 
circumstances, such as
+  // when filtering on a non-string partition column when the hive 
config key
+  // hive.metastore.try.direct.sql is false
   getPartitionsByFilterMethod.invoke(hive, table, filter)
 .asInstanceOf[JArrayList[Partition]]
 } catch {
-  case e: InvocationTargetException =>
-// SPARK-18167 retry to investigate the flaky test. This should be 
revert

spark git commit: [SPARK-17838][SPARKR] Check named arguments for options and use formatted R friendly message from JVM exception message

2016-11-01 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master ad4832a9f -> 1ecfafa08


[SPARK-17838][SPARKR] Check named arguments for options and use formatted R 
friendly message from JVM exception message

## What changes were proposed in this pull request?

This PR proposes to
- improve the R-friendly error messages rather than raw JVM exception one.

  As `read.json`, `read.text`, `read.orc`, `read.parquet` and `read.jdbc` are 
executed in the same  path with `read.df`, and `write.json`, `write.text`, 
`write.orc`, `write.parquet` and `write.jdbc` shares the same path with 
`write.df`, it seems it is safe to call `handledCallJMethod` to handle
  JVM messages.
-  prevent `zero-length variable name` and prints the ignored options as an 
warning message.

**Before**

``` r
> read.json("path", a = 1, 2, 3, "a")
Error in env[[name]] <- value :
  zero-length variable name
```

``` r
> read.json("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.orc("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.text("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...

> read.parquet("arbitrary_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: Path does not exist: file:/...;
  at 
org.apache.spark.sql.execution.datasources.DataSource$$anonfun$12.apply(DataSource.scala:398)
  ...
```

``` r
> write.json(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.orc(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.text(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)

> write.parquet(df, "existing_path")
Error in invokeJava(isStatic = FALSE, objId$id, methodName, ...) :
  org.apache.spark.sql.AnalysisException: path file:/... already exists.;
  at 
org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:68)
```

**After**

``` r
read.json("arbitrary_path", a = 1, 2, 3, "a")
Unnamed arguments ignored: 2, 3, a.
```

``` r
> read.json("arbitrary_path")
Error in json : analysis error - Path does not exist: file:/...

> read.orc("arbitrary_path")
Error in orc : analysis error - Path does not exist: file:/...

> read.text("arbitrary_path")
Error in text : analysis error - Path does not exist: file:/...

> read.parquet("arbitrary_path")
Error in parquet : analysis error - Path does not exist: file:/...
```

``` r
> write.json(df, "existing_path")
Error in json : analysis error - path file:/... already exists.;

> write.orc(df, "existing_path")
Error in orc : analysis error - path file:/... already exists.;

> write.text(df, "existing_path")
Error in text : analysis error - path file:/... already exists.;

> write.parquet(df, "existing_path")
Error in parquet : analysis error - path file:/... already exists.;
```
## How was this patch tested?

Unit tests in `test_utils.R` and `test_sparkSQL.R`.

Author: hyukjinkwon 

Closes #15608 from HyukjinKwon/SPARK-17838.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ecfafa0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ecfafa0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ecfafa0

Branch: refs/heads/master
Commit: 1ecfafa0869cb3a3e367bda8be252a69874dc4de
Parents: ad4832a
Author: hyukjinkwon 
Authored: Tue Nov 1 22:14:53 2016 -0700
Committer: Felix Cheung 
Committed: Tue Nov 1 22:14:53 2016 -0700

--
 R/pkg/R/DataFrame.R   | 10 +++---
 R/pkg/R/SQLContext.R  | 17 +-
 R/pkg/R/utils.R 

spark git commit: [SPARK-18216][SQL] Make Column.expr public

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 77a98162d -> ad4832a9f


[SPARK-18216][SQL] Make Column.expr public

## What changes were proposed in this pull request?
Column.expr is private[sql], but it's an actually really useful field to have 
for debugging. We should open it up, similar to how we use QueryExecution.

## How was this patch tested?
N/A - this is a simple visibility change.

Author: Reynold Xin 

Closes #15724 from rxin/SPARK-18216.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad4832a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad4832a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad4832a9

Branch: refs/heads/master
Commit: ad4832a9faf2c0c869bbcad9d71afe1cecbd3ec8
Parents: 77a9816
Author: Reynold Xin 
Authored: Tue Nov 1 21:20:53 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 21:20:53 2016 -0700

--
 sql/core/src/main/scala/org/apache/spark/sql/Column.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ad4832a9/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
index 05e867b..249408e 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Column.scala
@@ -118,6 +118,9 @@ class TypedColumn[-T, U](
  *   $"a" === $"b"
  * }}}
  *
+ * Note that the internal Catalyst expression can be accessed via "expr", but 
this method is for
+ * debugging purposes only and can change in any future Spark releases.
+ *
  * @groupname java_expr_ops Java-specific expression operators
  * @groupname expr_ops Expression operators
  * @groupname df_ops DataFrame functions
@@ -126,7 +129,7 @@ class TypedColumn[-T, U](
  * @since 1.3.0
  */
 @InterfaceStability.Stable
-class Column(protected[sql] val expr: Expression) extends Logging {
+class Column(val expr: Expression) extends Logging {
 
   def this(name: String) = this(name match {
 case "*" => UnresolvedStar(None)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18025] Use commit protocol API in structured streaming

2016-11-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master 91c33a0ca -> 77a98162d


[SPARK-18025] Use commit protocol API in structured streaming

## What changes were proposed in this pull request?
This patch adds a new commit protocol implementation ManifestFileCommitProtocol 
that follows the existing streaming flow, and uses it in FileStreamSink to 
consolidate the write path in structured streaming with the batch mode write 
path.

This deletes a lot of code, and would make it trivial to support other 
functionalities that are currently available in batch but not in streaming, 
including all file formats and bucketing.

## How was this patch tested?
Should be covered by existing tests.

Author: Reynold Xin 

Closes #15710 from rxin/SPARK-18025.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77a98162
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77a98162
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77a98162

Branch: refs/heads/master
Commit: 77a98162d1ec28247053b8b3ad4af28baa950797
Parents: 91c33a0
Author: Reynold Xin 
Authored: Tue Nov 1 18:06:57 2016 -0700
Committer: Michael Armbrust 
Committed: Tue Nov 1 18:06:57 2016 -0700

--
 .../datasources/FileCommitProtocol.scala|  11 +-
 .../sql/execution/datasources/FileFormat.scala  |  14 -
 .../datasources/FileFormatWriter.scala  | 400 ++
 .../InsertIntoHadoopFsRelationCommand.scala |  25 +-
 .../sql/execution/datasources/WriteOutput.scala | 406 ---
 .../datasources/parquet/ParquetFileFormat.scala |  11 -
 .../parquet/ParquetOutputWriter.scala   | 116 +-
 .../execution/streaming/FileStreamSink.scala| 229 ++-
 .../streaming/ManifestFileCommitProtocol.scala  | 114 ++
 .../org/apache/spark/sql/internal/SQLConf.scala |   3 +-
 .../sql/streaming/FileStreamSinkSuite.scala | 106 +
 11 files changed, 567 insertions(+), 868 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
index 1ce9ae4..f5dd5ce 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileCommitProtocol.scala
@@ -32,9 +32,9 @@ import org.apache.spark.util.Utils
 
 
 object FileCommitProtocol {
-  class TaskCommitMessage(obj: Any) extends Serializable
+  class TaskCommitMessage(val obj: Any) extends Serializable
 
-  object EmptyTaskCommitMessage extends TaskCommitMessage(Unit)
+  object EmptyTaskCommitMessage extends TaskCommitMessage(null)
 
   /**
* Instantiates a FileCommitProtocol using the given className.
@@ -62,8 +62,11 @@ object FileCommitProtocol {
 
 
 /**
- * An interface to define how a Spark job commits its outputs. Implementations 
must be serializable,
- * as the committer instance instantiated on the driver will be used for tasks 
on executors.
+ * An interface to define how a single Spark job commits its outputs. Two 
notes:
+ *
+ * 1. Implementations must be serializable, as the committer instance 
instantiated on the driver
+ *will be used for tasks on executors.
+ * 2. A committer should not be reused across multiple Spark jobs.
  *
  * The proper call sequence is:
  *

http://git-wip-us.apache.org/repos/asf/spark/blob/77a98162/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index 9d153ce..4f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -56,20 +56,6 @@ trait FileFormat {
   dataSchema: StructType): OutputWriterFactory
 
   /**
-   * Returns a [[OutputWriterFactory]] for generating output writers that can 
write data.
-   * This method is current used only by FileStreamSinkWriter to generate 
output writers that
-   * does not use output committers to write data. The OutputWriter generated 
by the returned
-   * [[OutputWriterFactory]] must implement the method `newWriter(path)`..
-   */
-  def buildWriter(
-  sqlContext: SQLContext,
-  dataSchema: StructType,
-  options: Map[String, String]): OutputWrite

spark git commit: [SPARK-18088][ML] Various ChiSqSelector cleanups

2016-11-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master b929537b6 -> 91c33a0ca


[SPARK-18088][ML] Various ChiSqSelector cleanups

## What changes were proposed in this pull request?
- Renamed kbest to numTopFeatures
- Renamed alpha to fpr
- Added missing Since annotations
- Doc cleanups
## How was this patch tested?

Added new standardized unit tests for spark.ml.
Improved existing unit test coverage a bit.

Author: Joseph K. Bradley 

Closes #15647 from jkbradley/chisqselector-follow-ups.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/91c33a0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/91c33a0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/91c33a0c

Branch: refs/heads/master
Commit: 91c33a0ca5c8287f710076ed7681e5aa13ca068f
Parents: b929537
Author: Joseph K. Bradley 
Authored: Tue Nov 1 17:00:00 2016 -0700
Committer: Joseph K. Bradley 
Committed: Tue Nov 1 17:00:00 2016 -0700

--
 docs/ml-features.md |  12 +-
 docs/mllib-feature-extraction.md|  15 +--
 .../apache/spark/ml/feature/ChiSqSelector.scala |  59 
 .../spark/mllib/api/python/PythonMLLibAPI.scala |   4 +-
 .../spark/mllib/feature/ChiSqSelector.scala |  45 +++
 .../spark/ml/feature/ChiSqSelectorSuite.scala   | 135 ++-
 .../mllib/feature/ChiSqSelectorSuite.scala  |  17 +--
 python/pyspark/ml/feature.py|  37 ++---
 python/pyspark/mllib/feature.py |  58 
 9 files changed, 197 insertions(+), 185 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/91c33a0c/docs/ml-features.md
--
diff --git a/docs/ml-features.md b/docs/ml-features.md
index 64c6a16..352887d 100644
--- a/docs/ml-features.md
+++ b/docs/ml-features.md
@@ -1338,14 +1338,14 @@ for more details on the API.
 `ChiSqSelector` stands for Chi-Squared feature selection. It operates on 
labeled data with
 categorical features. ChiSqSelector uses the
 [Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which
-features to choose. It supports three selection methods: `KBest`, `Percentile` 
and `FPR`:
+features to choose. It supports three selection methods: `numTopFeatures`, 
`percentile`, `fpr`:
 
-* `KBest` chooses the `k` top features according to a chi-squared test. This 
is akin to yielding the features with the most predictive power.
-* `Percentile` is similar to `KBest` but chooses a fraction of all features 
instead of a fixed number.
-* `FPR` chooses all features whose false positive rate meets some threshold.
+* `numTopFeatures` chooses a fixed number of top features according to a 
chi-squared test. This is akin to yielding the features with the most 
predictive power.
+* `percentile` is similar to `numTopFeatures` but chooses a fraction of all 
features instead of a fixed number.
+* `fpr` chooses all features whose p-value is below a threshold, thus 
controlling the false positive rate of selection.
 
-By default, the selection method is `KBest`, the default number of top 
features is 50. User can use
-`setNumTopFeatures`, `setPercentile` and `setAlpha` to set different selection 
methods.
+By default, the selection method is `numTopFeatures`, with the default number 
of top features set to 50.
+The user can choose a selection method using `setSelectorType`.
 
 **Examples**
 

http://git-wip-us.apache.org/repos/asf/spark/blob/91c33a0c/docs/mllib-feature-extraction.md
--
diff --git a/docs/mllib-feature-extraction.md b/docs/mllib-feature-extraction.md
index 87e1e02..42568c3 100644
--- a/docs/mllib-feature-extraction.md
+++ b/docs/mllib-feature-extraction.md
@@ -227,22 +227,19 @@ both speed and statistical learning behavior.
 
[`ChiSqSelector`](api/scala/index.html#org.apache.spark.mllib.feature.ChiSqSelector)
 implements
 Chi-Squared feature selection. It operates on labeled data with categorical 
features. ChiSqSelector uses the
 [Chi-Squared test of 
independence](https://en.wikipedia.org/wiki/Chi-squared_test) to decide which
-features to choose. It supports three selection methods: `KBest`, `Percentile` 
and `FPR`:
+features to choose. It supports three selection methods: `numTopFeatures`, 
`percentile`, `fpr`:
 
-* `KBest` chooses the `k` top features according to a chi-squared test. This 
is akin to yielding the features with the most predictive power.
-* `Percentile` is similar to `KBest` but chooses a fraction of all features 
instead of a fixed number.
-* `FPR` chooses all features whose false positive rate meets some threshold.
+* `numTopFeatures` chooses a fixed number of top features according to a 
chi-squared test. This is akin to yielding 

spark git commit: [SPARK-18182] Expose ReplayListenerBus.read() overload which takes string iterator

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 6e6298154 -> b929537b6


[SPARK-18182] Expose ReplayListenerBus.read() overload which takes string 
iterator

The `ReplayListenerBus.read()` method is used when implementing a custom 
`ApplicationHistoryProvider`. The current interface only exposes a `read()` 
method which takes an `InputStream` and performs stream-to-lines conversion 
itself, but it would also be useful to expose an overloaded method which 
accepts an iterator of strings, thereby enabling events to be provided from 
non-`InputStream` sources.

Author: Josh Rosen 

Closes #15698 from JoshRosen/replay-listener-bus-interface.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b929537b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b929537b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b929537b

Branch: refs/heads/master
Commit: b929537b6eb0f8f34497c3dbceea8045bf5dffdb
Parents: 6e62981
Author: Josh Rosen 
Authored: Tue Nov 1 16:49:41 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 16:49:41 2016 -0700

--
 .../apache/spark/scheduler/ReplayListenerBus.scala   | 15 +--
 1 file changed, 13 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b929537b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala 
b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
index 2424586..0bd5a6b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala
@@ -53,13 +53,24 @@ private[spark] class ReplayListenerBus extends 
SparkListenerBus with Logging {
   sourceName: String,
   maybeTruncated: Boolean = false,
   eventsFilter: ReplayEventsFilter = SELECT_ALL_FILTER): Unit = {
+val lines = Source.fromInputStream(logData).getLines()
+replay(lines, sourceName, maybeTruncated, eventsFilter)
+  }
 
+  /**
+   * Overloaded variant of [[replay()]] which accepts an iterator of lines 
instead of an
+   * [[InputStream]]. Exposed for use by custom ApplicationHistoryProvider 
implementations.
+   */
+  def replay(
+  lines: Iterator[String],
+  sourceName: String,
+  maybeTruncated: Boolean,
+  eventsFilter: ReplayEventsFilter): Unit = {
 var currentLine: String = null
 var lineNumber: Int = 0
 
 try {
-  val lineEntries = Source.fromInputStream(logData)
-.getLines()
+  val lineEntries = lines
 .zipWithIndex
 .filter { case (line, _) => eventsFilter(line) }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17350][SQL] Disable default use of KryoSerializer in Thrift Server

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 01dd00830 -> 6e6298154


[SPARK-17350][SQL] Disable default use of KryoSerializer in Thrift Server

In SPARK-4761 / #3621 (December 2014) we enabled Kryo serialization by default 
in the Spark Thrift Server. However, I don't think that the original rationale 
for doing this still holds now that most Spark SQL serialization is now 
performed via encoders and our UnsafeRow format.

In addition, the use of Kryo as the default serializer can introduce 
performance problems because the creation of new KryoSerializer instances is 
expensive and we haven't performed instance-reuse optimizations in several code 
paths (including DirectTaskResult deserialization).

Given all of this, I propose to revert back to using JavaSerializer as the 
default serializer in the Thrift Server.

/cc liancheng

Author: Josh Rosen 

Closes #14906 from JoshRosen/disable-kryo-in-thriftserver.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e629815
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e629815
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e629815

Branch: refs/heads/master
Commit: 6e6298154aba63831a292117797798131a646869
Parents: 01dd008
Author: Josh Rosen 
Authored: Tue Nov 1 16:23:47 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 16:23:47 2016 -0700

--
 docs/configuration.md |  5 ++---
 .../apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala  | 10 --
 2 files changed, 2 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e629815/docs/configuration.md
--
diff --git a/docs/configuration.md b/docs/configuration.md
index 780fc94..0017219 100644
--- a/docs/configuration.md
+++ b/docs/configuration.md
@@ -767,7 +767,7 @@ Apart from these, the following properties are also 
available, and may be useful
 
 
   spark.kryo.referenceTracking
-  true (false when using Spark SQL Thrift Server)
+  true
   
 Whether to track references to the same object when serializing data with 
Kryo, which is
 necessary if your object graphs have loops and useful for efficiency if 
they contain multiple
@@ -838,8 +838,7 @@ Apart from these, the following properties are also 
available, and may be useful
 
   spark.serializer
   
-org.apache.spark.serializer.JavaSerializer 
(org.apache.spark.serializer.
-KryoSerializer when using Spark SQL Thrift Server)
+org.apache.spark.serializer.JavaSerializer
   
   
 Class to use for serializing objects that will be sent over the network or 
need to be cached

http://git-wip-us.apache.org/repos/asf/spark/blob/6e629815/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
--
diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
index 6389115..78a3094 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala
@@ -19,8 +19,6 @@ package org.apache.spark.sql.hive.thriftserver
 
 import java.io.PrintStream
 
-import scala.collection.JavaConverters._
-
 import org.apache.spark.{SparkConf, SparkContext}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{SparkSession, SQLContext}
@@ -37,8 +35,6 @@ private[hive] object SparkSQLEnv extends Logging {
   def init() {
 if (sqlContext == null) {
   val sparkConf = new SparkConf(loadDefaults = true)
-  val maybeSerializer = sparkConf.getOption("spark.serializer")
-  val maybeKryoReferenceTracking = 
sparkConf.getOption("spark.kryo.referenceTracking")
   // If user doesn't specify the appName, we want to get 
[SparkSQL::localHostName] instead of
   // the default appName [SparkSQLCLIDriver] in cli or beeline.
   val maybeAppName = sparkConf
@@ -47,12 +43,6 @@ private[hive] object SparkSQLEnv extends Logging {
 
   sparkConf
 
.setAppName(maybeAppName.getOrElse(s"SparkSQL::${Utils.localHostName()}"))
-.set(
-  "spark.serializer",
-  
maybeSerializer.getOrElse("org.apache.spark.serializer.KryoSerializer"))
-.set(
-  "spark.kryo.referenceTracking",
-  maybeKryoReferenceTracking.getOrElse("false"))
 
   val sparkSession = 
SparkSession.builder.config(sparkConf).enableHiveSupport().getOrCreate()
   sparkContext = sparkSession.sparkContext



[1/2] spark git commit: Preparing Spark release v2.0.2-rc2

2016-11-01 Thread pwendell
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a01b95060 -> d401a74d4


Preparing Spark release v2.0.2-rc2


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6abe1ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6abe1ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6abe1ee

Branch: refs/heads/branch-2.0
Commit: a6abe1ee22141931614bf27a4f371c46d8379e33
Parents: a01b950
Author: Patrick Wendell 
Authored: Tue Nov 1 12:45:54 2016 -0700
Committer: Patrick Wendell 
Committed: Tue Nov 1 12:45:54 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 37 files changed, 38 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a6abe1ee/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index 0b01ca8..dfb7e22 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.3
+Version: 2.0.2
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/a6abe1ee/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index de09fce..58feedc 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.3-SNAPSHOT
+2.0.2
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6abe1ee/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index 2ee104f..a75d222 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.3-SNAPSHOT
+2.0.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6abe1ee/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index b20f9e2..828a407 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.3-SNAPSHOT
+2.0.2
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a6abe1ee/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 06895c6..30891f3 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.3-SNAPSHOT
+2.0.2
 ../../pom.xml
   
 

http://git-wip-us.apache

spark git commit: [SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to JSON string

2016-11-01 Thread marmbrus
Repository: spark
Updated Branches:
  refs/heads/master cfac17ee1 -> 01dd00830


[SPARK-17764][SQL] Add `to_json` supporting to convert nested struct column to 
JSON string

## What changes were proposed in this pull request?

This PR proposes to add `to_json` function in contrast with `from_json` in 
Scala, Java and Python.

It'd be useful if we can convert a same column from/to json. Also, some 
datasources do not support nested types. If we are forced to save a dataframe 
into those data sources, we might be able to work around by this function.

The usage is as below:

``` scala
val df = Seq(Tuple1(Tuple1(1))).toDF("a")
df.select(to_json($"a").as("json")).show()
```

``` bash
++
|json|
++
|{"_1":1}|
++
```
## How was this patch tested?

Unit tests in `JsonFunctionsSuite` and `JsonExpressionsSuite`.

Author: hyukjinkwon 

Closes #15354 from HyukjinKwon/SPARK-17764.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/01dd0083
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/01dd0083
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/01dd0083

Branch: refs/heads/master
Commit: 01dd0083011741c2bbe5ae1d2a25f2c9a1302b76
Parents: cfac17e
Author: hyukjinkwon 
Authored: Tue Nov 1 12:46:41 2016 -0700
Committer: Michael Armbrust 
Committed: Tue Nov 1 12:46:41 2016 -0700

--
 python/pyspark/sql/functions.py |  23 +++
 python/pyspark/sql/readwriter.py|   2 +-
 python/pyspark/sql/streaming.py |   2 +-
 .../catalyst/expressions/jsonExpressions.scala  |  48 -
 .../sql/catalyst/json/JacksonGenerator.scala| 197 ++
 .../spark/sql/catalyst/json/JacksonUtils.scala  |  26 +++
 .../expressions/JsonExpressionsSuite.scala  |   9 +
 .../scala/org/apache/spark/sql/Dataset.scala|   2 +-
 .../datasources/json/JacksonGenerator.scala | 198 ---
 .../datasources/json/JsonFileFormat.scala   |   2 +-
 .../scala/org/apache/spark/sql/functions.scala  |  44 -
 .../apache/spark/sql/JsonFunctionsSuite.scala   |  30 ++-
 12 files changed, 372 insertions(+), 211 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 7fa3fd2..45e3c22 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1744,6 +1744,29 @@ def from_json(col, schema, options={}):
 return Column(jc)
 
 
+@ignore_unicode_prefix
+@since(2.1)
+def to_json(col, options={}):
+"""
+Converts a column containing a [[StructType]] into a JSON string. Throws 
an exception,
+in the case of an unsupported type.
+
+:param col: name of column containing the struct
+:param options: options to control converting. accepts the same options as 
the json datasource
+
+>>> from pyspark.sql import Row
+>>> from pyspark.sql.types import *
+>>> data = [(1, Row(name='Alice', age=2))]
+>>> df = spark.createDataFrame(data, ("key", "value"))
+>>> df.select(to_json(df.value).alias("json")).collect()
+[Row(json=u'{"age":2,"name":"Alice"}')]
+"""
+
+sc = SparkContext._active_spark_context
+jc = sc._jvm.functions.to_json(_to_java_column(col), options)
+return Column(jc)
+
+
 @since(1.5)
 def size(col):
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/readwriter.py
--
diff --git a/python/pyspark/sql/readwriter.py b/python/pyspark/sql/readwriter.py
index bc786ef..b0c51b1 100644
--- a/python/pyspark/sql/readwriter.py
+++ b/python/pyspark/sql/readwriter.py
@@ -161,7 +161,7 @@ class DataFrameReader(OptionUtils):
  mode=None, columnNameOfCorruptRecord=None, dateFormat=None, 
timestampFormat=None):
 """
 Loads a JSON file (`JSON Lines text format or newline-delimited JSON
-<[http://jsonlines.org/>`_) or an RDD of Strings storing JSON objects 
(one object per
+`_) or an RDD of Strings storing JSON objects 
(one object per
 record) and returns the result as a :class`DataFrame`.
 
 If the ``schema`` parameter is not specified, this function goes

http://git-wip-us.apache.org/repos/asf/spark/blob/01dd0083/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 559647b..1c94413 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -641,7 +641,7 @@ class DataStreamReader(OptionUtils):
  timestampFormat=None):
 "

[2/2] spark git commit: Preparing development version 2.0.3-SNAPSHOT

2016-11-01 Thread pwendell
Preparing development version 2.0.3-SNAPSHOT


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d401a74d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d401a74d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d401a74d

Branch: refs/heads/branch-2.0
Commit: d401a74d40c7d29ea4f8ab6dfafb24baa2b87ece
Parents: a6abe1e
Author: Patrick Wendell 
Authored: Tue Nov 1 12:46:05 2016 -0700
Committer: Patrick Wendell 
Committed: Tue Nov 1 12:46:05 2016 -0700

--
 R/pkg/DESCRIPTION | 2 +-
 assembly/pom.xml  | 2 +-
 common/network-common/pom.xml | 2 +-
 common/network-shuffle/pom.xml| 2 +-
 common/network-yarn/pom.xml   | 2 +-
 common/sketch/pom.xml | 2 +-
 common/tags/pom.xml   | 2 +-
 common/unsafe/pom.xml | 2 +-
 core/pom.xml  | 2 +-
 docs/_config.yml  | 4 ++--
 examples/pom.xml  | 2 +-
 external/docker-integration-tests/pom.xml | 2 +-
 external/flume-assembly/pom.xml   | 2 +-
 external/flume-sink/pom.xml   | 2 +-
 external/flume/pom.xml| 2 +-
 external/java8-tests/pom.xml  | 2 +-
 external/kafka-0-10-assembly/pom.xml  | 2 +-
 external/kafka-0-10-sql/pom.xml   | 2 +-
 external/kafka-0-10/pom.xml   | 2 +-
 external/kafka-0-8-assembly/pom.xml   | 2 +-
 external/kafka-0-8/pom.xml| 2 +-
 external/kinesis-asl-assembly/pom.xml | 2 +-
 external/kinesis-asl/pom.xml  | 2 +-
 external/spark-ganglia-lgpl/pom.xml   | 2 +-
 graphx/pom.xml| 2 +-
 launcher/pom.xml  | 2 +-
 mllib-local/pom.xml   | 2 +-
 mllib/pom.xml | 2 +-
 pom.xml   | 2 +-
 repl/pom.xml  | 2 +-
 sql/catalyst/pom.xml  | 2 +-
 sql/core/pom.xml  | 2 +-
 sql/hive-thriftserver/pom.xml | 2 +-
 sql/hive/pom.xml  | 2 +-
 streaming/pom.xml | 2 +-
 tools/pom.xml | 2 +-
 yarn/pom.xml  | 2 +-
 37 files changed, 38 insertions(+), 38 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/R/pkg/DESCRIPTION
--
diff --git a/R/pkg/DESCRIPTION b/R/pkg/DESCRIPTION
index dfb7e22..0b01ca8 100644
--- a/R/pkg/DESCRIPTION
+++ b/R/pkg/DESCRIPTION
@@ -1,7 +1,7 @@
 Package: SparkR
 Type: Package
 Title: R Frontend for Apache Spark
-Version: 2.0.2
+Version: 2.0.3
 Date: 2016-08-27
 Authors@R: c(person("Shivaram", "Venkataraman", role = c("aut", "cre"),
 email = "shiva...@cs.berkeley.edu"),

http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/assembly/pom.xml
--
diff --git a/assembly/pom.xml b/assembly/pom.xml
index 58feedc..de09fce 100644
--- a/assembly/pom.xml
+++ b/assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2
+2.0.3-SNAPSHOT
 ../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/common/network-common/pom.xml
--
diff --git a/common/network-common/pom.xml b/common/network-common/pom.xml
index a75d222..2ee104f 100644
--- a/common/network-common/pom.xml
+++ b/common/network-common/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2
+2.0.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/common/network-shuffle/pom.xml
--
diff --git a/common/network-shuffle/pom.xml b/common/network-shuffle/pom.xml
index 828a407..b20f9e2 100644
--- a/common/network-shuffle/pom.xml
+++ b/common/network-shuffle/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2
+2.0.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/common/network-yarn/pom.xml
--
diff --git a/common/network-yarn/pom.xml b/common/network-yarn/pom.xml
index 30891f3..06895c6 100644
--- a/common/network-yarn/pom.xml
+++ b/common/network-yarn/pom.xml
@@ -22,7 +22,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.2
+2.0.3-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/d401a74d/common/sketch/pom.xml
--

[spark] Git Push Summary

2016-11-01 Thread pwendell
Repository: spark
Updated Tags:  refs/tags/v2.0.2-rc2 [created] a6abe1ee2

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18114][HOTFIX] Fix line-too-long style error from backport of SPARK-18114

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4176da8be -> a01b95060


[SPARK-18114][HOTFIX] Fix line-too-long style error from backport of SPARK-18114

## What changes were proposed in this pull request?

Fix style error introduced in cherry-pick of 
https://github.com/apache/spark/pull/15643 to branch-2.0.

## How was this patch tested?

Existing tests

Author: Sean Owen 

Closes #15719 from srowen/SPARK-18114.2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a01b9506
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a01b9506
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a01b9506

Branch: refs/heads/branch-2.0
Commit: a01b950602c4bb56c5a7d6213cdf6b7515ff36ec
Parents: 4176da8
Author: Sean Owen 
Authored: Tue Nov 1 12:43:50 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 12:43:50 2016 -0700

--
 .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a01b9506/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index cbf97c3..94827e4 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -448,7 +448,8 @@ private[spark] class MesosClusterScheduler(
 }
 desc.schedulerProperties
   .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
-  .foreach { case (key, value) => options ++= Seq("--conf", 
s$key=${shellEscape(value)}.stripMargin) }
+  .foreach { case (key, value) =>
+options ++= Seq("--conf", 
s$key=${shellEscape(value)}.stripMargin) }
 options
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18167] Disable flaky SQLQuerySuite test

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d0272b436 -> cfac17ee1


[SPARK-18167] Disable flaky SQLQuerySuite test

We now know it's a persistent environmental issue that is causing this test to 
sometimes fail. One hypothesis is that some configuration is leaked from 
another suite, and depending on suite ordering this can cause this test to fail.

I am planning on mining the jenkins logs to try to narrow down which suite 
could be causing this. For now, disable the test.

Author: Eric Liang 

Closes #15720 from ericl/disable-flaky-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cfac17ee
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cfac17ee
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cfac17ee

Branch: refs/heads/master
Commit: cfac17ee1cec414663b957228e469869eb7673c1
Parents: d0272b4
Author: Eric Liang 
Authored: Tue Nov 1 12:35:34 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 12:35:34 2016 -0700

--
 .../scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cfac17ee/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
index 8b91693..b9353b5 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/SQLQuerySuite.scala
@@ -1565,7 +1565,7 @@ class SQLQuerySuite extends QueryTest with SQLTestUtils 
with TestHiveSingleton {
 ).map(i => Row(i._1, i._2, i._3, i._4)))
   }
 
-  test("SPARK-10562: partition by column with mixed case name") {
+  ignore("SPARK-10562: partition by column with mixed case name") {
 def runOnce() {
   withTable("tbl10562") {
 val df = Seq(2012 -> "a").toDF("Year", "val")


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18148][SQL] Misleading Error Message for Aggregation Without Window/GroupBy

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8a538c97b -> d0272b436


[SPARK-18148][SQL] Misleading Error Message for Aggregation Without 
Window/GroupBy

## What changes were proposed in this pull request?

Aggregation Without Window/GroupBy expressions will fail in `checkAnalysis`, 
the error message is a bit misleading, we should generate a more specific error 
message for this case.

For example,

```
spark.read.load("/some-data")
  .withColumn("date_dt", to_date($"date"))
  .withColumn("year", year($"date_dt"))
  .withColumn("week", weekofyear($"date_dt"))
  .withColumn("user_count", count($"userId"))
  .withColumn("daily_max_in_week", max($"user_count").over(weeklyWindow))
)
```

creates the following output:

```
org.apache.spark.sql.AnalysisException: expression '`randomColumn`' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() (or first_value) if you don't care which value you get.;
```

In the error message above, `randomColumn` doesn't appear in the 
query(acturally it's added by function `withColumn`), so the message is not 
enough for the user to address the problem.
## How was this patch tested?

Manually test

Before:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: expression 'tbl.`col`' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() (or first_value) if you don't care which value you get.;;
```

After:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, 
and 'tbl.`col`' is not an aggregate function. Wrap '(count(col#231L) AS 
count(col)#239L)' in windowing function(s) or wrap 'tbl.`col`' in first() (or 
first_value) if you don't care which value you get.;;
```

Also add new test sqls in `group-by.sql`.

Author: jiangxingbo 

Closes #15672 from jiangxb1987/groupBy-empty.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d0272b43
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d0272b43
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d0272b43

Branch: refs/heads/master
Commit: d0272b436512b71f04313e109d3d21a6e9deefca
Parents: 8a538c9
Author: jiangxingbo 
Authored: Tue Nov 1 11:25:11 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:25:11 2016 -0700

--
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  12 ++
 .../resources/sql-tests/inputs/group-by.sql |  41 +--
 .../sql-tests/results/group-by.sql.out  | 116 ---
 .../org/apache/spark/sql/SQLQuerySuite.scala|  35 --
 4 files changed, 140 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d0272b43/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 9a7c2a9..3455a56 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -214,6 +214,18 @@ trait CheckAnalysis extends PredicateHelper {
 s"appear in the arguments of an aggregate function.")
   }
 }
+  case e: Attribute if groupingExprs.isEmpty =>
+// Collect all [[AggregateExpressions]]s.
+val aggExprs = aggregateExprs.filter(_.collect {
+  case a: AggregateExpression => a
+}.nonEmpty)
+failAnalysis(
+  s"grouping expressions sequence is empty, " +
+s"and '${e.sql}' is not an aggregate function. " +
+s"Wrap '${aggExprs.map(_.sql).mkString("(", ", ", ")")}' 
in windowing " +
+s"function(s) or wrap '${e.sql}' in first() (or 
first_value) " +
+s"if you don't care which value you get."
+)
   case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) 
=>
 failAnalysis(
   s"expression '${e.sql}' is neither present in the group by, 
" +

http://git-wip-us.apache.org/repos/asf/spark/blob/d0272b43/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
index 6741703..d950ec8 100644
--- a/sql/core/src/test/reso

spark git commit: [SPARK-18148][SQL] Misleading Error Message for Aggregation Without Window/GroupBy

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 58655f51f -> 4176da8be


[SPARK-18148][SQL] Misleading Error Message for Aggregation Without 
Window/GroupBy

## What changes were proposed in this pull request?

Aggregation Without Window/GroupBy expressions will fail in `checkAnalysis`, 
the error message is a bit misleading, we should generate a more specific error 
message for this case.

For example,

```
spark.read.load("/some-data")
  .withColumn("date_dt", to_date($"date"))
  .withColumn("year", year($"date_dt"))
  .withColumn("week", weekofyear($"date_dt"))
  .withColumn("user_count", count($"userId"))
  .withColumn("daily_max_in_week", max($"user_count").over(weeklyWindow))
)
```

creates the following output:

```
org.apache.spark.sql.AnalysisException: expression '`randomColumn`' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() (or first_value) if you don't care which value you get.;
```

In the error message above, `randomColumn` doesn't appear in the 
query(acturally it's added by function `withColumn`), so the message is not 
enough for the user to address the problem.
## How was this patch tested?

Manually test

Before:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: expression 'tbl.`col`' is neither 
present in the group by, nor is it an aggregate function. Add to group by or 
wrap in first() (or first_value) if you don't care which value you get.;;
```

After:

```
scala> spark.sql("select col, count(col) from tbl")
org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, 
and 'tbl.`col`' is not an aggregate function. Wrap '(count(col#231L) AS 
count(col)#239L)' in windowing function(s) or wrap 'tbl.`col`' in first() (or 
first_value) if you don't care which value you get.;;
```

Also add new test sqls in `group-by.sql`.

Author: jiangxingbo 

Closes #15672 from jiangxb1987/groupBy-empty.

(cherry picked from commit d0272b436512b71f04313e109d3d21a6e9deefca)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4176da8b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4176da8b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4176da8b

Branch: refs/heads/branch-2.0
Commit: 4176da8be57bb0b36b9f2c580a547713c2048d17
Parents: 58655f5
Author: jiangxingbo 
Authored: Tue Nov 1 11:25:11 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:25:18 2016 -0700

--
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  12 ++
 .../resources/sql-tests/inputs/group-by.sql |  41 +--
 .../sql-tests/results/group-by.sql.out  | 116 ---
 .../org/apache/spark/sql/SQLQuerySuite.scala|  35 --
 4 files changed, 140 insertions(+), 64 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4176da8b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index 790566c..10e0eef 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -214,6 +214,18 @@ trait CheckAnalysis extends PredicateHelper {
 s"appear in the arguments of an aggregate function.")
   }
 }
+  case e: Attribute if groupingExprs.isEmpty =>
+// Collect all [[AggregateExpressions]]s.
+val aggExprs = aggregateExprs.filter(_.collect {
+  case a: AggregateExpression => a
+}.nonEmpty)
+failAnalysis(
+  s"grouping expressions sequence is empty, " +
+s"and '${e.sql}' is not an aggregate function. " +
+s"Wrap '${aggExprs.map(_.sql).mkString("(", ", ", ")")}' 
in windowing " +
+s"function(s) or wrap '${e.sql}' in first() (or 
first_value) " +
+s"if you don't care which value you get."
+)
   case e: Attribute if !groupingExprs.exists(_.semanticEquals(e)) 
=>
 failAnalysis(
   s"expression '${e.sql}' is neither present in the group by, 
" +

http://git-wip-us.apache.org/repos/asf/spark/blob/4176da8b/sql/core/src/test/resources/sql-tests/inputs/group-by.sql
--
diff --git a/sql/core/src/test/resources/sql-tests/inputs/group-by.sql 
b/sql/core/s

spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4d2672a40 -> 58655f51f


[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise 
[DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156)
 KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues 
like

```
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe 

Closes #15706 from seyfe/keyvaluegrouped_serialization.

(cherry picked from commit 8a538c97b556f80f67c80519af0ce879557050d5)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/58655f51
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/58655f51
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/58655f51

Branch: refs/heads/branch-2.0
Commit: 58655f51f65d852ec65a65b54f26b3c8eac8cc60
Parents: 4d2672a
Author: Ergin Seyfe 
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:18:50 2016 -0700

--
 .../scala/org/apache/spark/repl/ReplSuite.scala| 17 +
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index f7d7a4f..8deafe3 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+val resultValue = 12345
+val output = runInterpreter("local",
+  s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in 
KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+  """.stripMargin)
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/58655f51/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 8eec42a..407d036 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -39,7 +39,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
 kEncoder: Encoder[K],
 vEncoder: Encoder[V],
-val queryExecution: QueryExecution,
+@transient val queryExecution: QueryExecution,
 private val dataAttributes: Seq[Attribute],
 private val groupingAttributes: Seq[Attribute]) extends Serializable {
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8cdf143f4 -> 8a538c97b


[SPARK-18189][SQL] Fix serialization issue in KeyValueGroupedDataset

## What changes were proposed in this pull request?
Likewise 
[DataSet.scala](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala#L156)
 KeyValueGroupedDataset should mark the queryExecution as transient.

As mentioned in the Jira ticket, without transient we saw serialization issues 
like

```
Caused by: java.io.NotSerializableException: 
org.apache.spark.sql.execution.QueryExecution
Serialization stack:
- object not serializable (class: 
org.apache.spark.sql.execution.QueryExecution, value: ==
```

## How was this patch tested?

Run the query which is specified in the Jira ticket before and after:
```
val a = spark.createDataFrame(sc.parallelize(Seq((1,2),(3,4.as[(Int,Int)]
val grouped = a.groupByKey(
{x:(Int,Int)=>x._1}
)
val mappedGroups = grouped.mapGroups((k,x)=>
{(k,1)}
)
val yyy = sc.broadcast(1)
val last = mappedGroups.rdd.map(xx=>
{ val simpley = yyy.value 1 }
)
```

Author: Ergin Seyfe 

Closes #15706 from seyfe/keyvaluegrouped_serialization.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8a538c97
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8a538c97
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8a538c97

Branch: refs/heads/master
Commit: 8a538c97b556f80f67c80519af0ce879557050d5
Parents: 8cdf143
Author: Ergin Seyfe 
Authored: Tue Nov 1 11:18:42 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:18:42 2016 -0700

--
 .../scala/org/apache/spark/repl/ReplSuite.scala| 17 +
 .../apache/spark/sql/KeyValueGroupedDataset.scala  |  2 +-
 2 files changed, 18 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
--
diff --git 
a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala 
b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
index 9262e93..96d2dfc 100644
--- a/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
+++ b/repl/scala-2.11/src/test/scala/org/apache/spark/repl/ReplSuite.scala
@@ -473,4 +473,21 @@ class ReplSuite extends SparkFunSuite {
 assertDoesNotContain("AssertionError", output)
 assertDoesNotContain("Exception", output)
   }
+
+  test("SPARK-18189: Fix serialization issue in KeyValueGroupedDataset") {
+val resultValue = 12345
+val output = runInterpreter("local",
+  s"""
+ |val keyValueGrouped = Seq((1, 2), (3, 4)).toDS().groupByKey(_._1)
+ |val mapGroups = keyValueGrouped.mapGroups((k, v) => (k, 1))
+ |val broadcasted = sc.broadcast($resultValue)
+ |
+ |// Using broadcast triggers serialization issue in 
KeyValueGroupedDataset
+ |val dataset = mapGroups.map(_ => broadcasted.value)
+ |dataset.collect()
+  """.stripMargin)
+assertDoesNotContain("error:", output)
+assertDoesNotContain("Exception", output)
+assertContains(s": Array[Int] = Array($resultValue, $resultValue)", output)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8a538c97/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
index 4cb0313..31ce8eb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/KeyValueGroupedDataset.scala
@@ -40,7 +40,7 @@ import org.apache.spark.sql.expressions.ReduceAggregator
 class KeyValueGroupedDataset[K, V] private[sql](
 kEncoder: Encoder[K],
 vEncoder: Encoder[V],
-val queryExecution: QueryExecution,
+@transient val queryExecution: QueryExecution,
 private val dataAttributes: Seq[Attribute],
 private val groupingAttributes: Seq[Attribute]) extends Serializable {
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to `MetadataLogFileIndex`

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 8ac09108f -> 8cdf143f4


[SPARK-18103][FOLLOW-UP][SQL][MINOR] Rename `MetadataLogFileCatalog` to 
`MetadataLogFileIndex`

## What changes were proposed in this pull request?

This is a follow-up to https://github.com/apache/spark/pull/15634.

## How was this patch tested?

N/A

Author: Liwei Lin 

Closes #15712 from lw-lin/18103.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8cdf143f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8cdf143f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8cdf143f

Branch: refs/heads/master
Commit: 8cdf143f4b1ca5c6bc0256808e6f42d9ef299cbd
Parents: 8ac0910
Author: Liwei Lin 
Authored: Tue Nov 1 11:17:35 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 11:17:35 2016 -0700

--
 .../streaming/MetadataLogFileCatalog.scala  | 60 
 .../streaming/MetadataLogFileIndex.scala| 60 
 2 files changed, 60 insertions(+), 60 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
deleted file mode 100644
index aeaa134..000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileCatalog.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import scala.collection.mutable
-
-import org.apache.hadoop.fs.{FileStatus, Path}
-
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.execution.datasources._
-
-
-/**
- * A [[FileIndex]] that generates the list of files to processing by reading 
them from the
- * metadata log files generated by the [[FileStreamSink]].
- */
-class MetadataLogFileIndex(sparkSession: SparkSession, path: Path)
-  extends PartitioningAwareFileIndex(sparkSession, Map.empty, None) {
-
-  private val metadataDirectory = new Path(path, FileStreamSink.metadataDir)
-  logInfo(s"Reading streaming file log from $metadataDirectory")
-  private val metadataLog =
-new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, 
metadataDirectory.toUri.toString)
-  private val allFilesFromLog = 
metadataLog.allFiles().map(_.toFileStatus).filterNot(_.isDirectory)
-  private var cachedPartitionSpec: PartitionSpec = _
-
-  override protected val leafFiles: mutable.LinkedHashMap[Path, FileStatus] = {
-new mutable.LinkedHashMap ++= allFilesFromLog.map(f => f.getPath -> f)
-  }
-
-  override protected val leafDirToChildrenFiles: Map[Path, Array[FileStatus]] 
= {
-allFilesFromLog.toArray.groupBy(_.getPath.getParent)
-  }
-
-  override def rootPaths: Seq[Path] = path :: Nil
-
-  override def refresh(): Unit = { }
-
-  override def partitionSpec(): PartitionSpec = {
-if (cachedPartitionSpec == null) {
-  cachedPartitionSpec = inferPartitioning()
-}
-cachedPartitionSpec
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/8cdf143f/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
new file mode 100644
index 000..aeaa134
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MetadataLogFileIndex.scala
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding

spark git commit: [SPARK-17848][ML] Move LabelCol datatype cast into Predictor.fit

2016-11-01 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 0cba535af -> 8ac09108f


[SPARK-17848][ML] Move LabelCol datatype cast into Predictor.fit

## What changes were proposed in this pull request?

1, move cast to `Predictor`
2, and then, remove unnecessary cast
## How was this patch tested?

existing tests

Author: Zheng RuiFeng 

Closes #15414 from zhengruifeng/move_cast.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8ac09108
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8ac09108
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8ac09108

Branch: refs/heads/master
Commit: 8ac09108fcf3fb62a812333a5b386b566a9d98ec
Parents: 0cba535
Author: Zheng RuiFeng 
Authored: Tue Nov 1 10:46:36 2016 -0700
Committer: Joseph K. Bradley 
Committed: Tue Nov 1 10:46:36 2016 -0700

--
 .../scala/org/apache/spark/ml/Predictor.scala   | 12 ++-
 .../spark/ml/classification/Classifier.scala|  4 +-
 .../spark/ml/classification/GBTClassifier.scala |  2 +-
 .../ml/classification/LogisticRegression.scala  |  2 +-
 .../spark/ml/classification/NaiveBayes.scala|  2 +-
 .../GeneralizedLinearRegression.scala   |  2 +-
 .../spark/ml/regression/LinearRegression.scala  |  2 +-
 .../org/apache/spark/ml/PredictorSuite.scala| 82 
 .../LogisticRegressionSuite.scala   |  1 -
 9 files changed, 98 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8ac09108/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
index e29d7f4..aa92edd 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Predictor.scala
@@ -58,7 +58,8 @@ private[ml] trait PredictorParams extends Params
 
 /**
  * :: DeveloperApi ::
- * Abstraction for prediction problems (regression and classification).
+ * Abstraction for prediction problems (regression and classification). It 
accepts all NumericType
+ * labels and will automatically cast it to DoubleType in [[fit()]].
  *
  * @tparam FeaturesType  Type of features.
  *   E.g., [[org.apache.spark.mllib.linalg.VectorUDT]] for 
vector features.
@@ -87,7 +88,12 @@ abstract class Predictor[
 // This handles a few items such as schema validation.
 // Developers only need to implement train().
 transformSchema(dataset.schema, logging = true)
-copyValues(train(dataset).setParent(this))
+
+// Cast LabelCol to DoubleType and keep the metadata.
+val labelMeta = dataset.schema($(labelCol)).metadata
+val casted = dataset.withColumn($(labelCol), 
col($(labelCol)).cast(DoubleType), labelMeta)
+
+copyValues(train(casted).setParent(this))
   }
 
   override def copy(extra: ParamMap): Learner
@@ -121,7 +127,7 @@ abstract class Predictor[
* and put it in an RDD with strong types.
*/
   protected def extractLabeledPoints(dataset: Dataset[_]): RDD[LabeledPoint] = 
{
-dataset.select(col($(labelCol)).cast(DoubleType), 
col($(featuresCol))).rdd.map {
+dataset.select(col($(labelCol)), col($(featuresCol))).rdd.map {
   case Row(label: Double, features: Vector) => LabeledPoint(label, 
features)
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/8ac09108/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
index d1b21b1..a3da306 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/Classifier.scala
@@ -71,7 +71,7 @@ abstract class Classifier[
* and put it in an RDD with strong types.
*
* @param dataset  DataFrame with columns for labels 
([[org.apache.spark.sql.types.NumericType]])
-   * and features ([[Vector]]). Labels are cast to 
[[DoubleType]].
+   * and features ([[Vector]]).
* @param numClasses  Number of classes label can take.  Labels must be 
integers in the range
*[0, numClasses).
* @throws SparkException  if any label is not an integer >= 0
@@ -79,7 +79,7 @@ abstract class Classifier[
   protected def extractLabeledPoints(dataset: Dataset[_], numClasses: Int): 
RDD[LabeledPoint] = {
 require(numClasses > 0, s"Classifier (in extractLabeledPoints) found 
numClasses =" +
   s" $numClasses, but requires numClasses > 0.")
-dataset.select(col($(labelCol)).cast(DoubleType

spark git commit: Revert "[SPARK-16839][SQL] redundant aliases after cleanupAliases"

2016-11-01 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 5441a6269 -> 0cba535af


Revert "[SPARK-16839][SQL] redundant aliases after cleanupAliases"

This reverts commit 5441a6269e00e3903ae6c1ea8deb4ddf3d2e9975.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0cba535a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0cba535a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0cba535a

Branch: refs/heads/master
Commit: 0cba535af3c65618f342fa2d7db9647f5e6f6f1b
Parents: 5441a62
Author: Herman van Hovell 
Authored: Tue Nov 1 17:30:37 2016 +0100
Committer: Herman van Hovell 
Committed: Tue Nov 1 17:30:37 2016 +0100

--
 R/pkg/inst/tests/testthat/test_sparkSQL.R   |  12 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  53 +++--
 .../catalyst/analysis/FunctionRegistry.scala|   2 +-
 .../sql/catalyst/expressions/Projection.scala   |   2 +
 .../expressions/complexTypeCreator.scala| 211 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala  |   4 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  38 +---
 .../catalyst/expressions/ComplexTypeSuite.scala |   1 +
 .../scala/org/apache/spark/sql/Column.scala |   3 -
 .../command/AnalyzeColumnCommand.scala  |   4 +-
 .../resources/sql-tests/inputs/group-by.sql |   2 +-
 .../sql-tests/results/group-by.sql.out  |   4 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  20 +-
 .../resources/sqlgen/subquery_in_having_2.sql   |   2 +-
 .../sql/catalyst/LogicalPlanToSQLSuite.scala|  12 +-
 15 files changed, 200 insertions(+), 170 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0cba535a/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 5002655..9289db5 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1222,16 +1222,16 @@ test_that("column functions", {
   # Test struct()
   df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
 schema = c("a", "b", "c"))
-  result <- collect(select(df, alias(struct("a", "c"), "d")))
+  result <- collect(select(df, struct("a", "c")))
   expected <- data.frame(row.names = 1:2)
-  expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)),
-  listToStruct(list(a = 4L, c = 6L)))
+  expected$"struct(a, c)" <- list(listToStruct(list(a = 1L, c = 3L)),
+ listToStruct(list(a = 4L, c = 6L)))
   expect_equal(result, expected)
 
-  result <- collect(select(df, alias(struct(df$a, df$b), "d")))
+  result <- collect(select(df, struct(df$a, df$b)))
   expected <- data.frame(row.names = 1:2)
-  expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)),
-  listToStruct(list(a = 4L, b = 5L)))
+  expected$"struct(a, b)" <- list(listToStruct(list(a = 1L, b = 2L)),
+ listToStruct(list(a = 4L, b = 5L)))
   expect_equal(result, expected)
 
   # Test encode(), decode()

http://git-wip-us.apache.org/repos/asf/spark/blob/0cba535a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 5011f2f..f8f4799 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -31,7 +31,7 @@ import 
org.apache.spark.sql.catalyst.optimizer.BooleanSimplification
 import org.apache.spark.sql.catalyst.plans._
 import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, _}
 import org.apache.spark.sql.catalyst.rules._
-import org.apache.spark.sql.catalyst.trees.TreeNodeRef
+import org.apache.spark.sql.catalyst.trees.{TreeNodeRef}
 import org.apache.spark.sql.catalyst.util.toPrettySQL
 import org.apache.spark.sql.types._
 
@@ -83,7 +83,6 @@ class Analyzer(
   ResolveTableValuedFunctions ::
   ResolveRelations ::
   ResolveReferences ::
-  ResolveCreateNamedStruct ::
   ResolveDeserializer ::
   ResolveNewInstance ::
   ResolveUpCast ::
@@ -654,12 +653,11 @@ class Analyzer(
 case s: Star => s.expand(child, resolver)
 case o => o :: Nil
   })
-case c: CreateNamedStruct if containsStar(c.valExprs) =>
-  val newChildren = c.children.grouped(2).flatMap {
-case Seq(k, s : Star) => CreateStruct(s.expand(child, 
resolver)).children
-case 

spark git commit: [SPARK-16839][SQL] redundant aliases after cleanupAliases

2016-11-01 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master f7c145d8c -> 5441a6269


[SPARK-16839][SQL] redundant aliases after cleanupAliases

## What changes were proposed in this pull request?

Simplify struct creation, especially the aspect of `CleanupAliases` which 
missed some aliases when handling trees created by `CreateStruct`.

This PR includes:

1. A failing test (create struct with nested aliases, some of the aliases 
survive `CleanupAliases`).
2. A fix that transforms `CreateStruct` into a `CreateNamedStruct` constructor, 
effectively eliminating `CreateStruct` from all expression trees.
3. A `NamePlaceHolder` used by `CreateStruct` when column names cannot be 
extracted from unresolved `NamedExpression`.
4. A new Analyzer rule that resolves `NamePlaceHolder` into a string literal 
once the `NamedExpression` is resolved.
5. `CleanupAliases` code was simplified as it no longer has to deal with 
`CreateStruct`'s top level columns.

## How was this patch tested?

running all tests-suits in package org.apache.spark.sql, especially including 
the analysis suite, making sure added test initially fails, after applying 
suggested fix rerun the entire analysis package successfully.

modified few tests that expected `CreateStruct` which is now transformed into 
`CreateNamedStruct`.

Credit goes to hvanhovell for assisting with this PR.

Author: eyal farago 
Author: eyal farago 
Author: Herman van Hovell 
Author: Eyal Farago 
Author: Hyukjin Kwon 
Author: eyalfa 

Closes #1 from eyalfa/SPARK-16839_redundant_aliases_after_cleanupAliases.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5441a626
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5441a626
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5441a626

Branch: refs/heads/master
Commit: 5441a6269e00e3903ae6c1ea8deb4ddf3d2e9975
Parents: f7c145d
Author: eyal farago 
Authored: Tue Nov 1 17:12:20 2016 +0100
Committer: Herman van Hovell 
Committed: Tue Nov 1 17:12:20 2016 +0100

--
 R/pkg/inst/tests/testthat/test_sparkSQL.R   |  12 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala  |  53 ++---
 .../catalyst/analysis/FunctionRegistry.scala|   2 +-
 .../sql/catalyst/expressions/Projection.scala   |   2 -
 .../expressions/complexTypeCreator.scala| 211 ++-
 .../spark/sql/catalyst/parser/AstBuilder.scala  |   4 +-
 .../sql/catalyst/analysis/AnalysisSuite.scala   |  38 +++-
 .../catalyst/expressions/ComplexTypeSuite.scala |   1 -
 .../scala/org/apache/spark/sql/Column.scala |   3 +
 .../command/AnalyzeColumnCommand.scala  |   4 +-
 .../resources/sql-tests/inputs/group-by.sql |   2 +-
 .../sql-tests/results/group-by.sql.out  |   4 +-
 .../apache/spark/sql/hive/test/TestHive.scala   |  20 +-
 .../resources/sqlgen/subquery_in_having_2.sql   |   2 +-
 .../sql/catalyst/LogicalPlanToSQLSuite.scala|  12 +-
 15 files changed, 170 insertions(+), 200 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5441a626/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R 
b/R/pkg/inst/tests/testthat/test_sparkSQL.R
index 9289db5..5002655 100644
--- a/R/pkg/inst/tests/testthat/test_sparkSQL.R
+++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R
@@ -1222,16 +1222,16 @@ test_that("column functions", {
   # Test struct()
   df <- createDataFrame(list(list(1L, 2L, 3L), list(4L, 5L, 6L)),
 schema = c("a", "b", "c"))
-  result <- collect(select(df, struct("a", "c")))
+  result <- collect(select(df, alias(struct("a", "c"), "d")))
   expected <- data.frame(row.names = 1:2)
-  expected$"struct(a, c)" <- list(listToStruct(list(a = 1L, c = 3L)),
- listToStruct(list(a = 4L, c = 6L)))
+  expected$"d" <- list(listToStruct(list(a = 1L, c = 3L)),
+  listToStruct(list(a = 4L, c = 6L)))
   expect_equal(result, expected)
 
-  result <- collect(select(df, struct(df$a, df$b)))
+  result <- collect(select(df, alias(struct(df$a, df$b), "d")))
   expected <- data.frame(row.names = 1:2)
-  expected$"struct(a, b)" <- list(listToStruct(list(a = 1L, b = 2L)),
- listToStruct(list(a = 4L, b = 5L)))
+  expected$"d" <- list(listToStruct(list(a = 1L, b = 2L)),
+  listToStruct(list(a = 4L, b = 5L)))
   expect_equal(result, expected)
 
   # Test encode(), decode()

http://git-wip-us.apache.org/repos/asf/spark/blob/5441a626/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src

spark git commit: [SPARK-17996][SQL] Fix unqualified catalog.getFunction(...)

2016-11-01 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 9b377aa49 -> f7c145d8c


[SPARK-17996][SQL] Fix unqualified catalog.getFunction(...)

## What changes were proposed in this pull request?

Currently an unqualified `getFunction(..)`call returns a wrong result; the 
returned function is shown as temporary function without a database. For 
example:

```
scala> sql("create function fn1 as 
'org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs'")
res0: org.apache.spark.sql.DataFrame = []

scala> spark.catalog.getFunction("fn1")
res1: org.apache.spark.sql.catalog.Function = Function[name='fn1', 
className='org.apache.hadoop.hive.ql.udf.generic.GenericUDFAbs', 
isTemporary='true']
```

This PR fixes this by adding database information to ExpressionInfo (which is 
used to store the function information).
## How was this patch tested?

Added more thorough tests to `CatalogSuite`.

Author: Herman van Hovell 

Closes #15542 from hvanhovell/SPARK-17996.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f7c145d8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f7c145d8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f7c145d8

Branch: refs/heads/master
Commit: f7c145d8ce14b23019099c509d5a2b6dfb1fe62c
Parents: 9b377aa
Author: Herman van Hovell 
Authored: Tue Nov 1 15:41:45 2016 +0100
Committer: Herman van Hovell 
Committed: Tue Nov 1 15:41:45 2016 +0100

--
 .../sql/catalyst/expressions/ExpressionInfo.java | 14 --
 .../sql/catalyst/analysis/FunctionRegistry.scala |  2 +-
 .../spark/sql/catalyst/catalog/SessionCatalog.scala  | 10 --
 .../spark/sql/execution/command/functions.scala  |  5 +++--
 .../org/apache/spark/sql/internal/CatalogImpl.scala  |  6 +++---
 .../org/apache/spark/sql/internal/CatalogSuite.scala | 15 ---
 6 files changed, 39 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f7c145d8/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
index ba8e9cb..4565ed4 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/ExpressionInfo.java
@@ -25,6 +25,7 @@ public class ExpressionInfo {
 private String usage;
 private String name;
 private String extended;
+private String db;
 
 public String getClassName() {
 return className;
@@ -42,14 +43,23 @@ public class ExpressionInfo {
 return extended;
 }
 
-public ExpressionInfo(String className, String name, String usage, String 
extended) {
+public String getDb() {
+return db;
+}
+
+public ExpressionInfo(String className, String db, String name, String 
usage, String extended) {
 this.className = className;
+this.db = db;
 this.name = name;
 this.usage = usage;
 this.extended = extended;
 }
 
 public ExpressionInfo(String className, String name) {
-this(className, name, null, null);
+this(className, null, name, null, null);
+}
+
+public ExpressionInfo(String className, String db, String name) {
+this(className, db, name, null, null);
 }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7c145d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index b05f4f6..3e836ca 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -495,7 +495,7 @@ object FunctionRegistry {
 val clazz = scala.reflect.classTag[T].runtimeClass
 val df = clazz.getAnnotation(classOf[ExpressionDescription])
 if (df != null) {
-  new ExpressionInfo(clazz.getCanonicalName, name, df.usage(), 
df.extended())
+  new ExpressionInfo(clazz.getCanonicalName, null, name, df.usage(), 
df.extended())
 } else {
   new ExpressionInfo(clazz.getCanonicalName, name)
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/f7c145d8/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
---

spark git commit: [SPARK-18114][MESOS] Fix mesos cluster scheduler generage command option error

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 e06f43e33 -> 4d2672a40


[SPARK-18114][MESOS] Fix mesos cluster scheduler generage command option error

Enclose --conf option value with "" to support multi value configs like 
spark.driver.extraJavaOptions, without "", driver will fail to start.

Jenkins Tests.

Test in our production environment, also unit tests, It is a very small change.

Author: Wang Lei 

Closes #15643 from LeightonWong/messos-cluster.

(cherry picked from commit 9b377aa49f14af31f54164378d60e0fdea2142e5)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4d2672a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4d2672a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4d2672a4

Branch: refs/heads/branch-2.0
Commit: 4d2672a408b6061b0e20b622960f7c6f14271275
Parents: e06f43e
Author: Wang Lei 
Authored: Tue Nov 1 13:42:10 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:43:47 2016 +

--
 .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4d2672a4/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 73bd4c5..cbf97c3 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -448,7 +448,7 @@ private[spark] class MesosClusterScheduler(
 }
 desc.schedulerProperties
   .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
-  .foreach { case (key, value) => options ++= Seq("--conf", 
s"$key=${shellEscape(value)}") }
+  .foreach { case (key, value) => options ++= Seq("--conf", 
s$key=${shellEscape(value)}.stripMargin) }
 options
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-18114][MESOS] Fix mesos cluster scheduler generage command option error

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master ec6f479bb -> 9b377aa49


[SPARK-18114][MESOS] Fix mesos cluster scheduler generage command option error

## What changes were proposed in this pull request?

Enclose --conf option value with "" to support multi value configs like 
spark.driver.extraJavaOptions, without "", driver will fail to start.
## How was this patch tested?

Jenkins Tests.

Test in our production environment, also unit tests, It is a very small change.

Author: Wang Lei 

Closes #15643 from LeightonWong/messos-cluster.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b377aa4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b377aa4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b377aa4

Branch: refs/heads/master
Commit: 9b377aa49f14af31f54164378d60e0fdea2142e5
Parents: ec6f479
Author: Wang Lei 
Authored: Tue Nov 1 13:42:10 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:42:10 2016 +

--
 .../spark/scheduler/cluster/mesos/MesosClusterScheduler.scala  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b377aa4/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 635712c..8db1d12 100644
--- 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -482,7 +482,7 @@ private[spark] class MesosClusterScheduler(
   .filter { case (key, _) => !replicatedOptionsBlacklist.contains(key) }
   .toMap
 (defaultConf ++ driverConf).foreach { case (key, value) =>
-  options ++= Seq("--conf", s"$key=${shellEscape(value)}") }
+  options ++= Seq("--conf", 
s$key=${shellEscape(value)}.stripMargin) }
 
 options
   }


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16881][MESOS] Migrate Mesos configs to use ConfigEntry

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master e34b4e126 -> ec6f479bb


[SPARK-16881][MESOS] Migrate Mesos configs to use ConfigEntry

## What changes were proposed in this pull request?

Migrate Mesos configs to use ConfigEntry
## How was this patch tested?

Jenkins Tests

Author: Sandeep Singh 

Closes #15654 from techaddict/SPARK-16881.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ec6f479b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ec6f479b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ec6f479b

Branch: refs/heads/master
Commit: ec6f479bb1d14c9eb45e0418353007be0416e4c5
Parents: e34b4e1
Author: Sandeep Singh 
Authored: Tue Nov 1 13:18:11 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:18:11 2016 +

--
 .../deploy/mesos/MesosClusterDispatcher.scala   |  9 +--
 .../mesos/MesosExternalShuffleService.scala |  3 +-
 .../org/apache/spark/deploy/mesos/config.scala  | 59 
 .../deploy/mesos/ui/MesosClusterPage.scala  |  3 +-
 4 files changed, 68 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ec6f479b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
 
b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
index 73b6ca3..7d6693b 100644
--- 
a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosClusterDispatcher.scala
@@ -20,6 +20,7 @@ package org.apache.spark.deploy.mesos
 import java.util.concurrent.CountDownLatch
 
 import org.apache.spark.{SecurityManager, SparkConf}
+import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.deploy.mesos.ui.MesosClusterUI
 import org.apache.spark.deploy.rest.mesos.MesosRestServer
 import org.apache.spark.internal.Logging
@@ -51,7 +52,7 @@ private[mesos] class MesosClusterDispatcher(
   extends Logging {
 
   private val publicAddress = 
Option(conf.getenv("SPARK_PUBLIC_DNS")).getOrElse(args.host)
-  private val recoveryMode = conf.get("spark.deploy.recoveryMode", 
"NONE").toUpperCase()
+  private val recoveryMode = conf.get(RECOVERY_MODE).toUpperCase()
   logInfo("Recovery mode in Mesos dispatcher set to: " + recoveryMode)
 
   private val engineFactory = recoveryMode match {
@@ -74,7 +75,7 @@ private[mesos] class MesosClusterDispatcher(
 
   def start(): Unit = {
 webUi.bind()
-scheduler.frameworkUrl = conf.get("spark.mesos.dispatcher.webui.url", 
webUi.activeWebUiUrl)
+scheduler.frameworkUrl = 
conf.get(DISPATCHER_WEBUI_URL).getOrElse(webUi.activeWebUiUrl)
 scheduler.start()
 server.start()
   }
@@ -99,8 +100,8 @@ private[mesos] object MesosClusterDispatcher extends Logging 
{
 conf.setMaster(dispatcherArgs.masterUrl)
 conf.setAppName(dispatcherArgs.name)
 dispatcherArgs.zookeeperUrl.foreach { z =>
-  conf.set("spark.deploy.recoveryMode", "ZOOKEEPER")
-  conf.set("spark.deploy.zookeeper.url", z)
+  conf.set(RECOVERY_MODE, "ZOOKEEPER")
+  conf.set(ZOOKEEPER_URL, z)
 }
 val dispatcher = new MesosClusterDispatcher(dispatcherArgs, conf)
 dispatcher.start()

http://git-wip-us.apache.org/repos/asf/spark/blob/ec6f479b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
 
b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
index 6b297c4..859aa83 100644
--- 
a/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/deploy/mesos/MesosExternalShuffleService.scala
@@ -24,6 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.{SecurityManager, SparkConf}
 import org.apache.spark.deploy.ExternalShuffleService
+import org.apache.spark.deploy.mesos.config._
 import org.apache.spark.internal.Logging
 import org.apache.spark.network.client.{RpcResponseCallback, TransportClient}
 import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler
@@ -114,7 +115,7 @@ private[mesos] class MesosExternalShuffleService(conf: 
SparkConf, securityManage
 
   protected override def newShuffleBlockHandler(
   conf: TransportConf): ExternalShuffleBlockHandler = {
-val cleanerIntervalS = 
this.conf.getTimeAsSeconds("spark.shuffle.cleaner.interval", "30s")
+val cleanerIntervalS = this.conf.get(SHUFFLE_CLEANER_INTERVAL_S)
 new MesosExternalShuffleBlockHandler(conf, c

spark git commit: [SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master cb80edc26 -> e34b4e126


[SPARK-15994][MESOS] Allow enabling Mesos fetch cache in coarse executor backend

Mesos 0.23.0 introduces a Fetch Cache feature 
http://mesos.apache.org/documentation/latest/fetcher/ which allows caching of 
resources specified in command URIs.

This patch:
- Updates the Mesos shaded protobuf dependency to 0.23.0
- Allows setting `spark.mesos.fetcherCache.enable` to enable the fetch cache 
for all specified URIs. (URIs must be specified for the setting to have any 
affect)
- Updates documentation for Mesos configuration with the new setting.

This patch does NOT:
- Allow for per-URI caching configuration. The cache setting is global to ALL 
URIs for the command.

Author: Charles Allen 

Closes #13713 from drcrallen/SPARK15994.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e34b4e12
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e34b4e12
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e34b4e12

Branch: refs/heads/master
Commit: e34b4e12673fb76c92f661d7c03527410857a0f8
Parents: cb80edc
Author: Charles Allen 
Authored: Tue Nov 1 13:14:17 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:14:17 2016 +

--
 docs/running-on-mesos.md|  9 +--
 .../cluster/mesos/MesosClusterScheduler.scala   |  3 ++-
 .../MesosCoarseGrainedSchedulerBackend.scala|  6 +++--
 .../cluster/mesos/MesosSchedulerUtils.scala |  6 +++--
 ...esosCoarseGrainedSchedulerBackendSuite.scala | 28 
 5 files changed, 45 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/docs/running-on-mesos.md
--
diff --git a/docs/running-on-mesos.md b/docs/running-on-mesos.md
index 77b06fc..923d8db 100644
--- a/docs/running-on-mesos.md
+++ b/docs/running-on-mesos.md
@@ -506,8 +506,13 @@ See the [configuration page](configuration.html) for 
information on Spark config
 since this configuration is just a upper limit and not a guaranteed amount.
   
 
-
-
+
+  spark.mesos.fetcherCache.enable
+  false
+  
+If set to `true`, all URIs (example: `spark.executor.uri`, 
`spark.mesos.uris`) will be cached by the [Mesos fetcher 
cache](http://mesos.apache.org/documentation/latest/fetcher/)
+  
+
 
 
 # Troubleshooting and Debugging

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
index 0b45499..635712c 100644
--- 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala
@@ -129,6 +129,7 @@ private[spark] class MesosClusterScheduler(
   private val queuedCapacity = conf.getInt("spark.mesos.maxDrivers", 200)
   private val retainedDrivers = conf.getInt("spark.mesos.retainedDrivers", 200)
   private val maxRetryWaitTime = 
conf.getInt("spark.mesos.cluster.retry.wait.max", 60) // 1 minute
+  private val useFetchCache = conf.getBoolean("spark.mesos.fetchCache.enable", 
false)
   private val schedulerState = engineFactory.createEngine("scheduler")
   private val stateLock = new Object()
   private val finishedDrivers =
@@ -396,7 +397,7 @@ private[spark] class MesosClusterScheduler(
 val jarUrl = desc.jarUrl.stripPrefix("file:").stripPrefix("local:")
 
 ((jarUrl :: confUris) ++ getDriverExecutorURI(desc).toList).map(uri =>
-  CommandInfo.URI.newBuilder().setValue(uri.trim()).build())
+  
CommandInfo.URI.newBuilder().setValue(uri.trim()).setCache(useFetchCache).build())
   }
 
   private def getDriverCommandValue(desc: MesosDriverDescription): String = {

http://git-wip-us.apache.org/repos/asf/spark/blob/e34b4e12/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
--
diff --git 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
index e67bf3e..5063c1f 100644
--- 
a/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
+++ 
b/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosCoarseGrainedSchedulerBackend.scala
@@ -59,6 +59,8 @@ private[spark] class Mesos

spark git commit: [SPARK-18111][SQL] Wrong ApproximatePercentile answer when multiple records have the minimum value

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 623fc7fc6 -> cb80edc26


[SPARK-18111][SQL] Wrong ApproximatePercentile answer when multiple records 
have the minimum value

## What changes were proposed in this pull request?

When multiple records have the minimum value, the answer of 
ApproximatePercentile is wrong.
## How was this patch tested?

add a test case

Author: wangzhenhua 

Closes #15641 from wzhfy/percentile.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cb80edc2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cb80edc2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cb80edc2

Branch: refs/heads/master
Commit: cb80edc26349e2e358d27fe2ae8e5d6959b77fab
Parents: 623fc7f
Author: wangzhenhua 
Authored: Tue Nov 1 13:11:24 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:11:24 2016 +

--
 .../spark/sql/catalyst/util/QuantileSummaries.scala  |  4 +++-
 .../spark/sql/ApproximatePercentileQuerySuite.scala  | 11 +++
 2 files changed, 14 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cb80edc2/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
index 27928c4..04f4ff2 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/QuantileSummaries.scala
@@ -264,7 +264,9 @@ object QuantileSummaries {
 res.prepend(head)
 // If necessary, add the minimum element:
 val currHead = currentSamples.head
-if (currHead.value < head.value) {
+// don't add the minimum element if `currentSamples` has only one element 
(both `currHead` and
+// `head` point to the same element)
+if (currHead.value <= head.value && currentSamples.length > 1) {
   res.prepend(currentSamples.head)
 }
 res.toArray

http://git-wip-us.apache.org/repos/asf/spark/blob/cb80edc2/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
index 37d7c44..e98092d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/ApproximatePercentileQuerySuite.scala
@@ -64,6 +64,17 @@ class ApproximatePercentileQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
+  test("percentile_approx, multiple records with the minimum value in a 
partition") {
+withTempView(table) {
+  spark.sparkContext.makeRDD(Seq(1, 1, 2, 1, 1, 3, 1, 1, 4, 1, 1, 5), 
4).toDF("col")
+.createOrReplaceTempView(table)
+  checkAnswer(
+spark.sql(s"SELECT percentile_approx(col, array(0.5)) FROM $table"),
+Row(Seq(1.0D))
+  )
+}
+  }
+
   test("percentile_approx, with different accuracies") {
 
 withTempView(table) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [MINOR][DOC] Remove spaces following slashs

2016-11-01 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master dd85eb544 -> 623fc7fc6


[MINOR][DOC] Remove spaces following slashs

## What changes were proposed in this pull request?

This PR merges multiple lines enumerating items in order to remove the 
redundant spaces following slashes in [Structured Streaming Programming Guide 
in 
2.0.2-rc1](http://people.apache.org/~pwendell/spark-releases/spark-2.0.2-rc1-docs/structured-streaming-programming-guide.html).
- Before: `Scala/ Java/ Python`
- After: `Scala/Java/Python`
## How was this patch tested?

Manual by the followings because this is documentation update.

```
cd docs
SKIP_API=1 jekyll build
```

Author: Dongjoon Hyun 

Closes #15686 from dongjoon-hyun/minor_doc_space.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/623fc7fc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/623fc7fc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/623fc7fc

Branch: refs/heads/master
Commit: 623fc7fc67735cfafdb7f527bd3df210987943c6
Parents: dd85eb5
Author: Dongjoon Hyun 
Authored: Tue Nov 1 13:08:49 2016 +
Committer: Sean Owen 
Committed: Tue Nov 1 13:08:49 2016 +

--
 docs/structured-streaming-programming-guide.md | 44 ++---
 1 file changed, 20 insertions(+), 24 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/623fc7fc/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 173fd6e..d838ed3 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -14,10 +14,8 @@ Structured Streaming is a scalable and fault-tolerant stream 
processing engine b
 
 # Quick Example
 Let’s say you want to maintain a running word count of text data received 
from a data server listening on a TCP socket. Let’s see how you can express 
this using Structured Streaming. You can see the full code in 
-[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/
-[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/
-[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py).
 And if you 
-[download Spark](http://spark.apache.org/downloads.html), you can directly run 
the example. In any case, let’s walk through the example step-by-step and 
understand how it works. First, we have to import the necessary classes and 
create a local SparkSession, the starting point of all functionalities related 
to Spark.
+[Scala]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredNetworkWordCount.scala)/[Java]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredNetworkWordCount.java)/[Python]({{site.SPARK_GITHUB_URL}}/blob/v{{site.SPARK_VERSION_SHORT}}/examples/src/main/python/sql/streaming/structured_network_wordcount.py).
+And if you [download Spark](http://spark.apache.org/downloads.html), you can 
directly run the example. In any case, let’s walk through the example 
step-by-step and understand how it works. First, we have to import the 
necessary classes and create a local SparkSession, the starting point of all 
functionalities related to Spark.
 
 
 
@@ -409,16 +407,15 @@ Delivering end-to-end exactly-once semantics was one of 
key goals behind the des
 to track the read position in the stream. The engine uses checkpointing and 
write ahead logs to record the offset range of the data being processed in each 
trigger. The streaming sinks are designed to be idempotent for handling 
reprocessing. Together, using replayable sources and idempotent sinks, 
Structured Streaming can ensure **end-to-end exactly-once semantics** under any 
failure.
 
 # API using Datasets and DataFrames
-Since Spark 2.0, DataFrames and Datasets can represent static, bounded data, 
as well as streaming, unbounded data. Similar to static Datasets/DataFrames, 
you can use the common entry point `SparkSession` 
([Scala](api/scala/index.html#org.apache.spark.sql.SparkSession)/
-[Java](api/java/org/apache/spark/sql/SparkSession.html)/
-[Python](api/python/pyspark.sql.html#pyspark.sql.SparkSession) docs) to create 
streaming DataFrames/Datasets from streaming sources, and apply the same 
operations on them as static DataFrames/Datasets. If you are not familiar with 
Dataset

spark git commit: [SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql than it does in hive-client

2016-11-01 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master d9d146500 -> dd85eb544


[SPARK-18107][SQL] Insert overwrite statement runs much slower in spark-sql 
than it does in hive-client

## What changes were proposed in this pull request?

As reported on the jira, insert overwrite statement runs much slower in Spark, 
compared with hive-client.

It seems there is a patch 
[HIVE-11940](https://github.com/apache/hive/commit/ba21806b77287e237e1aa68fa169d2a81e07346d)
 which largely improves insert overwrite performance on Hive. HIVE-11940 is 
patched after Hive 2.0.0.

Because Spark SQL uses older Hive library, we can not benefit from such 
improvement.

The reporter verified that there is also a big performance gap between Hive 
1.2.1 (520.037 secs) and Hive 2.0.1 (35.975 secs) on insert overwrite execution.

Instead of upgrading to Hive 2.0 in Spark SQL, which might not be a trivial 
task, this patch provides an approach to delete the partition before asking 
Hive to load data files into the partition.

Note: The case reported on the jira is insert overwrite to partition. Since 
`Hive.loadTable` also uses the function to replace files, insert overwrite to 
table should has the same issue. We can take the same approach to delete the 
table first. I will upgrade this to include this.
## How was this patch tested?

Jenkins tests.

There are existing tests using insert overwrite statement. Those tests should 
be passed. I added a new test to specially test insert overwrite into partition.

For performance issue, as I don't have Hive 2.0 environment, this needs the 
reporter to verify it. Please refer to the jira.

Please review 
https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark before 
opening a pull request.

Author: Liang-Chi Hsieh 

Closes #15667 from viirya/improve-hive-insertoverwrite.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dd85eb54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dd85eb54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dd85eb54

Branch: refs/heads/master
Commit: dd85eb5448c8f2672260b57e94c0da0eaac12616
Parents: d9d1465
Author: Liang-Chi Hsieh 
Authored: Tue Nov 1 00:24:08 2016 -0700
Committer: Reynold Xin 
Committed: Tue Nov 1 00:24:08 2016 -0700

--
 .../hive/execution/InsertIntoHiveTable.scala| 24 +-
 .../sql/hive/execution/SQLQuerySuite.scala  | 33 
 2 files changed, 56 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dd85eb54/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
index c3c4e29..2843100 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/InsertIntoHiveTable.scala
@@ -37,6 +37,7 @@ import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.physical.Partitioning
 import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode}
+import org.apache.spark.sql.execution.command.{AlterTableAddPartitionCommand, 
AlterTableDropPartitionCommand}
 import org.apache.spark.sql.hive._
 import org.apache.spark.sql.hive.HiveShim.{ShimFileSinkDesc => FileSinkDesc}
 import org.apache.spark.SparkException
@@ -257,7 +258,28 @@ case class InsertIntoHiveTable(
 table.catalogTable.identifier.table,
 partitionSpec)
 
+var doHiveOverwrite = overwrite
+
 if (oldPart.isEmpty || !ifNotExists) {
+  // SPARK-18107: Insert overwrite runs much slower than hive-client.
+  // Newer Hive largely improves insert overwrite performance. As 
Spark uses older Hive
+  // version and we may not want to catch up new Hive version every 
time. We delete the
+  // Hive partition first and then load data file into the Hive 
partition.
+  if (oldPart.nonEmpty && overwrite) {
+oldPart.get.storage.locationUri.map { uri =>
+  val partitionPath = new Path(uri)
+  val fs = partitionPath.getFileSystem(hadoopConf)
+  if (fs.exists(partitionPath)) {
+if (!fs.delete(partitionPath, true)) {
+  throw new RuntimeException(
+"Cannot remove partition directory '" + 
partitionPath.toString)
+}
+// Don't let Hive do overwrite operation since it is slower.
+doHiveOverwrite = false
+