spark git commit: [SPARK-16331][SQL] Reduce code generation time

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master aa6564f37 -> 14cf61e90


[SPARK-16331][SQL] Reduce code generation time

## What changes were proposed in this pull request?
During the code generation, a `LocalRelation` often has a huge `Vector` object 
as `data`. In the simple example below, a `LocalRelation` has a Vector with 
100 elements of `UnsafeRow`.

```
val numRows = 100
val ds = (1 to numRows).toDS().persist()
benchmark.addCase("filter+reduce") { iter =>
  ds.filter(a => (a & 1) == 0).reduce(_ + _)
}
```

At `TreeNode.transformChildren`, all elements of the vector is unnecessarily 
iterated to check whether any children exist in the vector since `Vector` is 
Traversable. This part significantly increases code generation time.

This patch avoids this overhead by checking the number of children before 
iterating all elements; `LocalRelation` does not have children since it extends 
`LeafNode`.

The performance of the above example
```
without this patch
Java HotSpot(TM) 64-Bit Server VM 1.8.0_91-b14 on Mac OS X 10.11.5
Intel(R) Core(TM) i5-5257U CPU  2.70GHz
compilationTime: Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

filter+reduce 4426 / 4533  0.2
4426.0   1.0X

with this patch
compilationTime: Best/Avg Time(ms)Rate(M/s)   Per 
Row(ns)   Relative

filter+reduce 3117 / 3391  0.3
3116.6   1.0X
```

## How was this patch tested?

using existing unit tests

Author: Hiroshi Inoue 

Closes #14000 from inouehrs/compilation-time-reduction.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/14cf61e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/14cf61e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/14cf61e9

Branch: refs/heads/master
Commit: 14cf61e909598d9f6b9c3b920de7299e9bc828e0
Parents: aa6564f
Author: Hiroshi Inoue 
Authored: Thu Jun 30 21:47:44 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 21:47:44 2016 -0700

--
 .../spark/sql/catalyst/trees/TreeNode.scala | 82 ++--
 1 file changed, 43 insertions(+), 39 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/14cf61e9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index 072445a..8bce404 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -315,25 +315,9 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   protected def transformChildren(
   rule: PartialFunction[BaseType, BaseType],
   nextOperation: (BaseType, PartialFunction[BaseType, BaseType]) => 
BaseType): BaseType = {
-var changed = false
-val newArgs = mapProductIterator {
-  case arg: TreeNode[_] if containsChild(arg) =>
-val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
-if (!(newChild fastEquals arg)) {
-  changed = true
-  newChild
-} else {
-  arg
-}
-  case Some(arg: TreeNode[_]) if containsChild(arg) =>
-val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
-if (!(newChild fastEquals arg)) {
-  changed = true
-  Some(newChild)
-} else {
-  Some(arg)
-}
-  case m: Map[_, _] => m.mapValues {
+if (children.nonEmpty) {
+  var changed = false
+  val newArgs = mapProductIterator {
 case arg: TreeNode[_] if containsChild(arg) =>
   val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
   if (!(newChild fastEquals arg)) {
@@ -342,33 +326,53 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] 
extends Product {
   } else {
 arg
   }
-case other => other
-  }.view.force // `mapValues` is lazy and we need to force it to 
materialize
-  case d: DataType => d // Avoid unpacking Structs
-  case args: Traversable[_] => args.map {
-case arg: TreeNode[_] if containsChild(arg) =>
+case Some(arg: TreeNode[_]) if containsChild(arg) =>
   val newChild = nextOperation(arg.asInstanceOf[BaseType], rule)
  

spark git commit: [SPARK-14608][ML] transformSchema needs better documentation

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 80a7bff89 -> cc3c44b11


[SPARK-14608][ML] transformSchema needs better documentation

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14608
PipelineStage.transformSchema currently has minimal documentation. It should 
have more to explain it can:
check schema
check parameter interactions

## How was this patch tested?
unit test

Author: Yuhao Yang 
Author: Yuhao Yang 

Closes #12384 from hhbyyh/transformSchemaDoc.

(cherry picked from commit aa6564f37f1d8de77c3b7bfa885000252efffea6)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc3c44b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc3c44b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc3c44b1

Branch: refs/heads/branch-2.0
Commit: cc3c44b1196c4186c0b55e319460524e9b9f865b
Parents: 80a7bff
Author: Yuhao Yang 
Authored: Thu Jun 30 19:34:51 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 19:35:06 2016 -0700

--
 mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/cc3c44b1/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 25e56d7..a1d08b3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -44,7 +44,10 @@ abstract class PipelineStage extends Params with Logging {
   /**
* :: DeveloperApi ::
*
-   * Derives the output schema from the input schema.
+   * Check transform validity and derive the output schema from the input 
schema.
+   *
+   * Typical implementation should first conduct verification on schema change 
and parameter
+   * validity, including complex parameter interaction checks.
*/
   @DeveloperApi
   def transformSchema(schema: StructType): StructType


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-14608][ML] transformSchema needs better documentation

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 38f4d6f44 -> aa6564f37


[SPARK-14608][ML] transformSchema needs better documentation

## What changes were proposed in this pull request?
jira: https://issues.apache.org/jira/browse/SPARK-14608
PipelineStage.transformSchema currently has minimal documentation. It should 
have more to explain it can:
check schema
check parameter interactions

## How was this patch tested?
unit test

Author: Yuhao Yang 
Author: Yuhao Yang 

Closes #12384 from hhbyyh/transformSchemaDoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/aa6564f3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/aa6564f3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/aa6564f3

Branch: refs/heads/master
Commit: aa6564f37f1d8de77c3b7bfa885000252efffea6
Parents: 38f4d6f
Author: Yuhao Yang 
Authored: Thu Jun 30 19:34:51 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 19:34:51 2016 -0700

--
 mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala | 5 -
 1 file changed, 4 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/aa6564f3/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
--
diff --git a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala 
b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
index 25e56d7..a1d08b3 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/Pipeline.scala
@@ -44,7 +44,10 @@ abstract class PipelineStage extends Params with Logging {
   /**
* :: DeveloperApi ::
*
-   * Derives the output schema from the input schema.
+   * Check transform validity and derive the output schema from the input 
schema.
+   *
+   * Typical implementation should first conduct verification on schema change 
and parameter
+   * validity, including complex parameter interaction checks.
*/
   @DeveloperApi
   def transformSchema(schema: StructType): StructType


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API

2016-06-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 94d61de9c -> 80a7bff89


[SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API

## What changes were proposed in this pull request?

Add Catalog.refreshTable API into python interface for Spark-SQL.

## How was this patch tested?

Existing test.

Author: WeichenXu 

Closes #13558 from WeichenXu123/update_python_sql_interface_refreshTable.

(cherry picked from commit 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/80a7bff8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/80a7bff8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/80a7bff8

Branch: refs/heads/branch-2.0
Commit: 80a7bff897554ce77fe6bc91d62cff8857892322
Parents: 94d61de
Author: WeichenXu 
Authored: Thu Jun 30 23:00:39 2016 +0800
Committer: Cheng Lian 
Committed: Fri Jul 1 10:18:44 2016 +0800

--
 python/pyspark/sql/catalog.py   | 5 +
 .../src/main/scala/org/apache/spark/sql/catalog/Catalog.scala   | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/80a7bff8/python/pyspark/sql/catalog.py
--
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 3033f14..4af930a 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -232,6 +232,11 @@ class Catalog(object):
 """Removes all cached tables from the in-memory cache."""
 self._jcatalog.clearCache()
 
+@since(2.0)
+def refreshTable(self, tableName):
+"""Invalidate and refresh all the cached metadata of the given 
table."""
+self._jcatalog.refreshTable(tableName)
+
 def _reset(self):
 """(Internal use only) Drop all existing databases (except "default"), 
tables,
 partitions and functions, and set the current database to "default".

http://git-wip-us.apache.org/repos/asf/spark/blob/80a7bff8/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 083a63c..91ed9b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -214,7 +214,7 @@ abstract class Catalog {
   def clearCache(): Unit
 
   /**
-   * Invalidate and refresh all the cached the metadata of the given table. 
For performance reasons,
+   * Invalidate and refresh all the cached metadata of the given table. For 
performance reasons,
* Spark SQL or the external data source library it uses might cache certain 
metadata about a
* table, such as the location of blocks. When those change outside of Spark 
SQL, users should
* call this function to invalidate the cache.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-15954][SQL] Disable loading test tables in Python tests

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 4a981dc87 -> 38f4d6f44


[SPARK-15954][SQL] Disable loading test tables in Python tests

## What changes were proposed in this pull request?
This patch introduces a flag to disable loading test tables in 
TestHiveSparkSession and disables that in Python. This fixes an issue in which 
python/run-tests would fail due to failure to load test tables.

Note that these test tables are not used outside of HiveCompatibilitySuite. In 
the long run we should probably decouple the loading of test tables from the 
test Hive setup.

## How was this patch tested?
This is a test only change.

Author: Reynold Xin 

Closes #14005 from rxin/SPARK-15954.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/38f4d6f4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/38f4d6f4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/38f4d6f4

Branch: refs/heads/master
Commit: 38f4d6f44eaa03bdc703662e4a7be9c09ba86e16
Parents: 4a981dc
Author: Reynold Xin 
Authored: Thu Jun 30 19:02:35 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 19:02:35 2016 -0700

--
 python/pyspark/sql/context.py   |   2 +-
 .../apache/spark/sql/hive/test/TestHive.scala   | 344 ++-
 2 files changed, 185 insertions(+), 161 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/38f4d6f4/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 8c984b3..4cfdf79 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -492,7 +492,7 @@ class HiveContext(SQLContext):
 confusing error messages.
 """
 jsc = sparkContext._jsc.sc()
-jtestHive = 
sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc)
+jtestHive = 
sparkContext._jvm.org.apache.spark.sql.hive.test.TestHiveContext(jsc, False)
 return cls(sparkContext, jtestHive)
 
 def refreshTable(self, tableName):

http://git-wip-us.apache.org/repos/asf/spark/blob/38f4d6f4/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
--
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
index b45be02..7f89204 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/test/TestHive.scala
@@ -73,8 +73,12 @@ class TestHiveContext(
 @transient override val sparkSession: TestHiveSparkSession)
   extends SQLContext(sparkSession) {
 
-  def this(sc: SparkContext) {
-this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc)))
+  /**
+   * If loadTestTables is false, no test tables are loaded. Note that this 
flag can only be true
+   * when running in the JVM, i.e. it needs to be false when calling from 
Python.
+   */
+  def this(sc: SparkContext, loadTestTables: Boolean = true) {
+this(new TestHiveSparkSession(HiveUtils.withHiveExternalCatalog(sc), 
loadTestTables))
   }
 
   override def newSession(): TestHiveContext = {
@@ -103,13 +107,24 @@ class TestHiveContext(
 
 }
 
-
+/**
+ * A [[SparkSession]] used in [[TestHiveContext]].
+ *
+ * @param sc SparkContext
+ * @param warehousePath path to the Hive warehouse directory
+ * @param scratchDirPath scratch directory used by Hive's metastore client
+ * @param metastoreTemporaryConf configuration options for Hive's metastore
+ * @param existingSharedState optional [[TestHiveSharedState]]
+ * @param loadTestTables if true, load the test tables. They can only be 
loaded when running
+ *   in the JVM, i.e when calling from Python this flag 
has to be false.
+ */
 private[hive] class TestHiveSparkSession(
 @transient private val sc: SparkContext,
 val warehousePath: File,
 scratchDirPath: File,
 metastoreTemporaryConf: Map[String, String],
-@transient private val existingSharedState: Option[TestHiveSharedState])
+@transient private val existingSharedState: Option[TestHiveSharedState],
+private val loadTestTables: Boolean)
   extends SparkSession(sc) with Logging { self =>
 
   // TODO: We need to set the temp warehouse path to sc's conf.
@@ -118,13 +133,14 @@ private[hive] class TestHiveSparkSession(
   // when we creating metadataHive. This flow is not easy to follow and can 
introduce
   // confusion when a developer is debugging an issue. We need to refactor 
this part
   // to just set the temp warehouse path in sc's conf.
-  def this(sc: SparkContext) {
+  def 

spark git commit: [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master dab105161 -> 4a981dc87


[SPARK-15643][DOC][ML] Add breaking changes to ML migration guide

This PR adds the breaking changes from 
[SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) to the 
migration guide.

## How was this patch tested?

Built docs locally.

Author: Nick Pentreath 

Closes #13924 from MLnick/SPARK-15643-migration-guide.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a981dc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a981dc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a981dc8

Branch: refs/heads/master
Commit: 4a981dc870a31d8b90aac5f6cb22884e02f6fbc6
Parents: dab1051
Author: Nick Pentreath 
Authored: Thu Jun 30 17:55:14 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 17:55:14 2016 -0700

--
 docs/mllib-guide.md | 104 +--
 1 file changed, 101 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4a981dc8/docs/mllib-guide.md
--
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index c28d137..17fd3e1 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -104,9 +104,105 @@ and the migration guide below will explain all changes 
between releases.
 
 ## From 1.6 to 2.0
 
-The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` 
packages include:
+### Breaking changes
 
-Deprecations:
+There were several breaking changes in Spark 2.0, which are outlined below.
+
+**Linear algebra classes for DataFrame-based APIs**
+
+Spark's linear algebra dependencies were moved to a new project, `mllib-local` 
+(see [SPARK-13944](https://issues.apache.org/jira/browse/SPARK-13944)). 
+As part of this change, the linear algebra classes were copied to a new 
package, `spark.ml.linalg`. 
+The DataFrame-based APIs in `spark.ml` now depend on the `spark.ml.linalg` 
classes, 
+leading to a few breaking changes, predominantly in various model classes 
+(see [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) for a 
full list).
+
+**Note:** the RDD-based APIs in `spark.mllib` continue to depend on the 
previous package `spark.mllib.linalg`.
+
+_Converting vectors and matrices_
+
+While most pipeline components support backward compatibility for loading, 
+some existing `DataFrames` and pipelines in Spark versions prior to 2.0, that 
contain vector or matrix 
+columns, may need to be migrated to the new `spark.ml` vector and matrix 
types. 
+Utilities for converting `DataFrame` columns from `spark.mllib.linalg` to 
`spark.ml.linalg` types
+(and vice versa) can be found in `spark.mllib.util.MLUtils`.
+
+There are also utility methods available for converting single instances of 
+vectors and matrices. Use the `asML` method on a `mllib.linalg.Vector` / 
`mllib.linalg.Matrix`
+for converting to `ml.linalg` types, and 
+`mllib.linalg.Vectors.fromML` / `mllib.linalg.Matrices.fromML` 
+for converting to `mllib.linalg` types.
+
+
+
+
+{% highlight scala %}
+import org.apache.spark.mllib.util.MLUtils
+
+// convert DataFrame columns
+val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
+val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
+// convert a single vector or matrix
+val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML
+val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML
+{% endhighlight %}
+
+Refer to the [`MLUtils` Scala 
docs](api/scala/index.html#org.apache.spark.mllib.util.MLUtils$) for further 
detail.
+
+
+
+
+{% highlight java %}
+import org.apache.spark.mllib.util.MLUtils;
+import org.apache.spark.sql.Dataset;
+
+// convert DataFrame columns
+Dataset convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF);
+Dataset convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF);
+// convert a single vector or matrix
+org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML();
+org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML();
+{% endhighlight %}
+
+Refer to the [`MLUtils` Java 
docs](api/java/org/apache/spark/mllib/util/MLUtils.html) for further detail.
+
+
+
+
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+
+# convert DataFrame columns
+convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
+convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
+# convert a single vector or matrix
+mlVec = mllibVec.asML()
+mlMat = mllibMat.asML()
+{% endhighlight %}
+
+Refer to the [`MLUtils` Python 
docs](api/python/pyspark.mllib.html#pyspark.mllib.util.MLUtils) for further 
detail.
+
+
+
+**Deprecated methods removed**
+
+Several deprecated methods were removed in the `spark.mllib` and 

spark git commit: [SPARK-15643][DOC][ML] Add breaking changes to ML migration guide

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 d3027c45f -> 79c96c999


[SPARK-15643][DOC][ML] Add breaking changes to ML migration guide

This PR adds the breaking changes from 
[SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) to the 
migration guide.

## How was this patch tested?

Built docs locally.

Author: Nick Pentreath 

Closes #13924 from MLnick/SPARK-15643-migration-guide.

(cherry picked from commit 4a981dc870a31d8b90aac5f6cb22884e02f6fbc6)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/79c96c99
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/79c96c99
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/79c96c99

Branch: refs/heads/branch-2.0
Commit: 79c96c99977b0478c25b13583a3e88cbab541ba6
Parents: d3027c4
Author: Nick Pentreath 
Authored: Thu Jun 30 17:55:14 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 17:55:37 2016 -0700

--
 docs/mllib-guide.md | 104 +--
 1 file changed, 101 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/79c96c99/docs/mllib-guide.md
--
diff --git a/docs/mllib-guide.md b/docs/mllib-guide.md
index c28d137..17fd3e1 100644
--- a/docs/mllib-guide.md
+++ b/docs/mllib-guide.md
@@ -104,9 +104,105 @@ and the migration guide below will explain all changes 
between releases.
 
 ## From 1.6 to 2.0
 
-The deprecations and changes of behavior in the `spark.mllib` or `spark.ml` 
packages include:
+### Breaking changes
 
-Deprecations:
+There were several breaking changes in Spark 2.0, which are outlined below.
+
+**Linear algebra classes for DataFrame-based APIs**
+
+Spark's linear algebra dependencies were moved to a new project, `mllib-local` 
+(see [SPARK-13944](https://issues.apache.org/jira/browse/SPARK-13944)). 
+As part of this change, the linear algebra classes were copied to a new 
package, `spark.ml.linalg`. 
+The DataFrame-based APIs in `spark.ml` now depend on the `spark.ml.linalg` 
classes, 
+leading to a few breaking changes, predominantly in various model classes 
+(see [SPARK-14810](https://issues.apache.org/jira/browse/SPARK-14810) for a 
full list).
+
+**Note:** the RDD-based APIs in `spark.mllib` continue to depend on the 
previous package `spark.mllib.linalg`.
+
+_Converting vectors and matrices_
+
+While most pipeline components support backward compatibility for loading, 
+some existing `DataFrames` and pipelines in Spark versions prior to 2.0, that 
contain vector or matrix 
+columns, may need to be migrated to the new `spark.ml` vector and matrix 
types. 
+Utilities for converting `DataFrame` columns from `spark.mllib.linalg` to 
`spark.ml.linalg` types
+(and vice versa) can be found in `spark.mllib.util.MLUtils`.
+
+There are also utility methods available for converting single instances of 
+vectors and matrices. Use the `asML` method on a `mllib.linalg.Vector` / 
`mllib.linalg.Matrix`
+for converting to `ml.linalg` types, and 
+`mllib.linalg.Vectors.fromML` / `mllib.linalg.Matrices.fromML` 
+for converting to `mllib.linalg` types.
+
+
+
+
+{% highlight scala %}
+import org.apache.spark.mllib.util.MLUtils
+
+// convert DataFrame columns
+val convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
+val convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
+// convert a single vector or matrix
+val mlVec: org.apache.spark.ml.linalg.Vector = mllibVec.asML
+val mlMat: org.apache.spark.ml.linalg.Matrix = mllibMat.asML
+{% endhighlight %}
+
+Refer to the [`MLUtils` Scala 
docs](api/scala/index.html#org.apache.spark.mllib.util.MLUtils$) for further 
detail.
+
+
+
+
+{% highlight java %}
+import org.apache.spark.mllib.util.MLUtils;
+import org.apache.spark.sql.Dataset;
+
+// convert DataFrame columns
+Dataset convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF);
+Dataset convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF);
+// convert a single vector or matrix
+org.apache.spark.ml.linalg.Vector mlVec = mllibVec.asML();
+org.apache.spark.ml.linalg.Matrix mlMat = mllibMat.asML();
+{% endhighlight %}
+
+Refer to the [`MLUtils` Java 
docs](api/java/org/apache/spark/mllib/util/MLUtils.html) for further detail.
+
+
+
+
+{% highlight python %}
+from pyspark.mllib.util import MLUtils
+
+# convert DataFrame columns
+convertedVecDF = MLUtils.convertVectorColumnsToML(vecDF)
+convertedMatrixDF = MLUtils.convertMatrixColumnsToML(matrixDF)
+# convert a single vector or matrix
+mlVec = mllibVec.asML()
+mlMat = mllibMat.asML()
+{% endhighlight %}
+
+Refer to the [`MLUtils` Python 

spark git commit: [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 17c7522c8 -> d3027c45f


[SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to 
PySpark linalg

The move to `ml.linalg` created `asML`/`fromML` utility methods in Scala/Java 
for converting between representations. These are missing in Python, this PR 
adds them.

## How was this patch tested?

New doctests.

Author: Nick Pentreath 

Closes #13997 from MLnick/SPARK-16328-python-linalg-convert.

(cherry picked from commit dab10516138867b7c4fc6d42168497e82853b539)
Signed-off-by: Joseph K. Bradley 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d3027c45
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d3027c45
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d3027c45

Branch: refs/heads/branch-2.0
Commit: d3027c45fbe02752d260aefff9dae707ba5c5d4c
Parents: 17c7522
Author: Nick Pentreath 
Authored: Thu Jun 30 17:52:15 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 17:52:29 2016 -0700

--
 python/pyspark/mllib/linalg/__init__.py | 99 
 python/pyspark/mllib/tests.py   | 69 +++
 2 files changed, 168 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/d3027c45/python/pyspark/mllib/linalg/__init__.py
--
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index 3a345b2..15dc53a 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -39,6 +39,7 @@ else:
 import numpy as np
 
 from pyspark import since
+from pyspark.ml import linalg as newlinalg
 from pyspark.sql.types import UserDefinedType, StructField, StructType, 
ArrayType, DoubleType, \
 IntegerType, ByteType, BooleanType
 
@@ -247,6 +248,15 @@ class Vector(object):
 """
 raise NotImplementedError
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.Vector`
+"""
+raise NotImplementedError
+
 
 class DenseVector(Vector):
 """
@@ -408,6 +418,17 @@ class DenseVector(Vector):
 """
 return self.array
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.DenseVector`
+
+.. versionadded:: 2.0.0
+"""
+return newlinalg.DenseVector(self.array)
+
 @property
 def values(self):
 """
@@ -737,6 +758,17 @@ class SparseVector(Vector):
 arr[self.indices] = self.values
 return arr
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.SparseVector`
+
+.. versionadded:: 2.0.0
+"""
+return newlinalg.SparseVector(self.size, self.indices, self.values)
+
 def __len__(self):
 return self.size
 
@@ -846,6 +878,24 @@ class Vectors(object):
 return DenseVector(elements)
 
 @staticmethod
+def fromML(vec):
+"""
+Convert a vector from the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:param vec: a :py:class:`pyspark.ml.linalg.Vector`
+:return: a :py:class:`pyspark.mllib.linalg.Vector`
+
+.. versionadded:: 2.0.0
+"""
+if isinstance(vec, newlinalg.DenseVector):
+return DenseVector(vec.array)
+elif isinstance(vec, newlinalg.SparseVector):
+return SparseVector(vec.size, vec.indices, vec.values)
+else:
+raise TypeError("Unsupported vector type %s" % type(vec))
+
+@staticmethod
 def stringify(vector):
 """
 Converts a vector into a string, which can be recognized by
@@ -945,6 +995,13 @@ class Matrix(object):
 """
 raise NotImplementedError
 
+def asML(self):
+"""
+Convert this matrix to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+"""
+raise NotImplementedError
+
 @staticmethod
 def _convert_to_array(array_like, dtype):
 """
@@ -1044,6 +1101,17 @@ class DenseMatrix(Matrix):
 
 return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, 
values)
 
+def asML(self):
+"""
+Convert this matrix to the new mllib-local 

spark git commit: [SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to PySpark linalg

2016-06-30 Thread jkbradley
Repository: spark
Updated Branches:
  refs/heads/master 85f2303ec -> dab105161


[SPARK-16328][ML][MLLIB][PYSPARK] Add 'asML' and 'fromML' conversion methods to 
PySpark linalg

The move to `ml.linalg` created `asML`/`fromML` utility methods in Scala/Java 
for converting between representations. These are missing in Python, this PR 
adds them.

## How was this patch tested?

New doctests.

Author: Nick Pentreath 

Closes #13997 from MLnick/SPARK-16328-python-linalg-convert.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dab10516
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dab10516
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dab10516

Branch: refs/heads/master
Commit: dab10516138867b7c4fc6d42168497e82853b539
Parents: 85f2303
Author: Nick Pentreath 
Authored: Thu Jun 30 17:52:15 2016 -0700
Committer: Joseph K. Bradley 
Committed: Thu Jun 30 17:52:15 2016 -0700

--
 python/pyspark/mllib/linalg/__init__.py | 99 
 python/pyspark/mllib/tests.py   | 69 +++
 2 files changed, 168 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dab10516/python/pyspark/mllib/linalg/__init__.py
--
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index 3a345b2..15dc53a 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -39,6 +39,7 @@ else:
 import numpy as np
 
 from pyspark import since
+from pyspark.ml import linalg as newlinalg
 from pyspark.sql.types import UserDefinedType, StructField, StructType, 
ArrayType, DoubleType, \
 IntegerType, ByteType, BooleanType
 
@@ -247,6 +248,15 @@ class Vector(object):
 """
 raise NotImplementedError
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.Vector`
+"""
+raise NotImplementedError
+
 
 class DenseVector(Vector):
 """
@@ -408,6 +418,17 @@ class DenseVector(Vector):
 """
 return self.array
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.DenseVector`
+
+.. versionadded:: 2.0.0
+"""
+return newlinalg.DenseVector(self.array)
+
 @property
 def values(self):
 """
@@ -737,6 +758,17 @@ class SparseVector(Vector):
 arr[self.indices] = self.values
 return arr
 
+def asML(self):
+"""
+Convert this vector to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.SparseVector`
+
+.. versionadded:: 2.0.0
+"""
+return newlinalg.SparseVector(self.size, self.indices, self.values)
+
 def __len__(self):
 return self.size
 
@@ -846,6 +878,24 @@ class Vectors(object):
 return DenseVector(elements)
 
 @staticmethod
+def fromML(vec):
+"""
+Convert a vector from the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:param vec: a :py:class:`pyspark.ml.linalg.Vector`
+:return: a :py:class:`pyspark.mllib.linalg.Vector`
+
+.. versionadded:: 2.0.0
+"""
+if isinstance(vec, newlinalg.DenseVector):
+return DenseVector(vec.array)
+elif isinstance(vec, newlinalg.SparseVector):
+return SparseVector(vec.size, vec.indices, vec.values)
+else:
+raise TypeError("Unsupported vector type %s" % type(vec))
+
+@staticmethod
 def stringify(vector):
 """
 Converts a vector into a string, which can be recognized by
@@ -945,6 +995,13 @@ class Matrix(object):
 """
 raise NotImplementedError
 
+def asML(self):
+"""
+Convert this matrix to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+"""
+raise NotImplementedError
+
 @staticmethod
 def _convert_to_array(array_like, dtype):
 """
@@ -1044,6 +1101,17 @@ class DenseMatrix(Matrix):
 
 return SparseMatrix(self.numRows, self.numCols, colPtrs, rowIndices, 
values)
 
+def asML(self):
+"""
+Convert this matrix to the new mllib-local representation.
+This does NOT copy the data; it copies references.
+
+:return: :py:class:`pyspark.ml.linalg.DenseMatrix`
+

spark git commit: [SPARK-16276][SQL] Implement elt SQL function

2016-06-30 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 3d75a5b2a -> 85f2303ec


[SPARK-16276][SQL] Implement elt SQL function

## What changes were proposed in this pull request?
This patch implements the elt function, as it is implemented in Hive.

## How was this patch tested?
Added expression unit test in StringExpressionsSuite and end-to-end test in 
StringFunctionsSuite.

Author: petermaxlee 

Closes #13966 from petermaxlee/SPARK-16276.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/85f2303e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/85f2303e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/85f2303e

Branch: refs/heads/master
Commit: 85f2303ecadd9bf6d9694a2743dda075654c5ccf
Parents: 3d75a5b
Author: petermaxlee 
Authored: Fri Jul 1 07:57:48 2016 +0800
Committer: Wenchen Fan 
Committed: Fri Jul 1 07:57:48 2016 +0800

--
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../expressions/ExpectsInputTypes.scala |  3 +-
 .../expressions/stringExpressions.scala | 41 
 .../expressions/StringExpressionsSuite.scala| 23 +++
 .../apache/spark/sql/StringFunctionsSuite.scala | 14 +++
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 6 files changed, 82 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
index 3fbdb2a..26b0c30 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala
@@ -267,6 +267,7 @@ object FunctionRegistry {
 expression[Concat]("concat"),
 expression[ConcatWs]("concat_ws"),
 expression[Decode]("decode"),
+expression[Elt]("elt"),
 expression[Encode]("encode"),
 expression[FindInSet]("find_in_set"),
 expression[FormatNumber]("format_number"),

http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
index c15a2df..98f25a9 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ExpectsInputTypes.scala
@@ -57,7 +57,8 @@ trait ExpectsInputTypes extends Expression {
 
 
 /**
- * A mixin for the analyzer to perform implicit type casting using 
[[ImplicitTypeCasts]].
+ * A mixin for the analyzer to perform implicit type casting using
+ * [[org.apache.spark.sql.catalyst.analysis.TypeCoercion.ImplicitTypeCasts]].
  */
 trait ImplicitCastInputTypes extends ExpectsInputTypes {
   // No other methods

http://git-wip-us.apache.org/repos/asf/spark/blob/85f2303e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
index 44ff7fd..b0df957 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/stringExpressions.scala
@@ -21,6 +21,7 @@ import java.text.{DecimalFormat, DecimalFormatSymbols}
 import java.util.{HashMap, Locale, Map => JMap}
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
 import org.apache.spark.sql.catalyst.expressions.codegen._
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.types._
@@ -162,6 +163,46 @@ case class ConcatWs(children: Seq[Expression])
   }
 }
 
+@ExpressionDescription(
+  usage = "_FUNC_(n, str1, str2, ...) - returns the n-th string, e.g. returns 
str2 when n is 2",
+  extended = "> SELECT _FUNC_(1, 'scala', 'java') FROM src LIMIT 1;\n" + 
"'scala'")
+case class Elt(children: Seq[Expression])
+  extends Expression with 

spark git commit: [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 4dc7d377f -> 17c7522c8


[SPARK-16313][SQL] Spark should not silently drop exceptions in file listing

## What changes were proposed in this pull request?
Spark silently drops exceptions during file listing. This is a very bad 
behavior because it can mask legitimate errors and the resulting plan will 
silently have 0 rows. This patch changes it to not silently drop the errors.

## How was this patch tested?
Manually verified.

Author: Reynold Xin 

Closes #13987 from rxin/SPARK-16313.

(cherry picked from commit 3d75a5b2a76eba0855d73476dc2fd579c612d521)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/17c7522c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/17c7522c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/17c7522c

Branch: refs/heads/branch-2.0
Commit: 17c7522c8cb8f400408cbdc3b8b1251bbca53eec
Parents: 4dc7d37
Author: Reynold Xin 
Authored: Thu Jun 30 16:51:11 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 16:51:15 2016 -0700

--
 python/pyspark/sql/context.py   |  2 +-
 python/pyspark/sql/streaming.py |  2 +-
 .../sql/execution/datasources/DataSource.scala  |  3 ++-
 .../datasources/ListingFileCatalog.scala| 22 +++-
 .../datasources/fileSourceInterfaces.scala  | 11 ++
 .../sql/streaming/FileStreamSourceSuite.scala   |  2 +-
 6 files changed, 29 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 3503fb9..8c984b3 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -440,7 +440,7 @@ class SQLContext(object):
 
 :return: :class:`DataStreamReader`
 
->>> text_sdf = 
sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
+>>> text_sdf = sqlContext.readStream.text(tempfile.mkdtemp())
 >>> text_sdf.isStreaming
 True
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 8cf7098..bffe398 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -437,7 +437,7 @@ class DataStreamReader(OptionUtils):
 
 :param paths: string, or list of strings, for input path(s).
 
->>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 
'data'))
+>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
 >>> text_sdf.isStreaming
 True
 >>> "value" in str(text_sdf.schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 557445c..a4110d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -364,7 +364,8 @@ case class DataSource(
 }
 
 val fileCatalog =
-  new ListingFileCatalog(sparkSession, globbedPaths, options, 
partitionSchema)
+  new ListingFileCatalog(
+sparkSession, globbedPaths, options, partitionSchema, 
!checkPathExist)
 
 val dataSchema = userSpecifiedSchema.map { schema =>
   val equality =

http://git-wip-us.apache.org/repos/asf/spark/blob/17c7522c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 675e755..706ec6b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 import scala.util.Try
 
@@ -35,12 

spark git commit: [SPARK-16313][SQL] Spark should not silently drop exceptions in file listing

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fb41670c9 -> 3d75a5b2a


[SPARK-16313][SQL] Spark should not silently drop exceptions in file listing

## What changes were proposed in this pull request?
Spark silently drops exceptions during file listing. This is a very bad 
behavior because it can mask legitimate errors and the resulting plan will 
silently have 0 rows. This patch changes it to not silently drop the errors.

## How was this patch tested?
Manually verified.

Author: Reynold Xin 

Closes #13987 from rxin/SPARK-16313.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3d75a5b2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3d75a5b2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3d75a5b2

Branch: refs/heads/master
Commit: 3d75a5b2a76eba0855d73476dc2fd579c612d521
Parents: fb41670
Author: Reynold Xin 
Authored: Thu Jun 30 16:51:11 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 16:51:11 2016 -0700

--
 python/pyspark/sql/context.py   |  2 +-
 python/pyspark/sql/streaming.py |  2 +-
 .../sql/execution/datasources/DataSource.scala  |  3 ++-
 .../datasources/ListingFileCatalog.scala| 22 +++-
 .../datasources/fileSourceInterfaces.scala  | 11 ++
 .../sql/streaming/FileStreamSourceSuite.scala   |  2 +-
 6 files changed, 29 insertions(+), 13 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/python/pyspark/sql/context.py
--
diff --git a/python/pyspark/sql/context.py b/python/pyspark/sql/context.py
index 3503fb9..8c984b3 100644
--- a/python/pyspark/sql/context.py
+++ b/python/pyspark/sql/context.py
@@ -440,7 +440,7 @@ class SQLContext(object):
 
 :return: :class:`DataStreamReader`
 
->>> text_sdf = 
sqlContext.readStream.text(os.path.join(tempfile.mkdtemp(), 'data'))
+>>> text_sdf = sqlContext.readStream.text(tempfile.mkdtemp())
 >>> text_sdf.isStreaming
 True
 """

http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/python/pyspark/sql/streaming.py
--
diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index 8cf7098..bffe398 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -437,7 +437,7 @@ class DataStreamReader(OptionUtils):
 
 :param paths: string, or list of strings, for input path(s).
 
->>> text_sdf = spark.readStream.text(os.path.join(tempfile.mkdtemp(), 
'data'))
+>>> text_sdf = spark.readStream.text(tempfile.mkdtemp())
 >>> text_sdf.isStreaming
 True
 >>> "value" in str(text_sdf.schema)

http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 557445c..a4110d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -364,7 +364,8 @@ case class DataSource(
 }
 
 val fileCatalog =
-  new ListingFileCatalog(sparkSession, globbedPaths, options, 
partitionSchema)
+  new ListingFileCatalog(
+sparkSession, globbedPaths, options, partitionSchema, 
!checkPathExist)
 
 val dataSchema = userSpecifiedSchema.map { schema =>
   val equality =

http://git-wip-us.apache.org/repos/asf/spark/blob/3d75a5b2/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
index 675e755..706ec6b 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ListingFileCatalog.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.execution.datasources
 
+import java.io.FileNotFoundException
+
 import scala.collection.mutable
 import scala.util.Try
 
@@ -35,12 +37,16 @@ import org.apache.spark.sql.types.StructType
  * @param paths a list of paths to scan
  * @param partitionSchema an 

spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 03008e049 -> 4dc7d377f


[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

## What changes were proposed in this pull request?
This patch appends a message to suggest users running refresh table or 
reloading data frames when Spark sees a FileNotFoundException due to stale, 
cached metadata.

## How was this patch tested?
Added a unit test for this in MetadataCacheSuite.

Author: petermaxlee 

Closes #14003 from petermaxlee/SPARK-16336.

(cherry picked from commit fb41670c9263a89ec233861cc91a19cf1bb19073)
Signed-off-by: Reynold Xin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4dc7d377
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4dc7d377
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4dc7d377

Branch: refs/heads/branch-2.0
Commit: 4dc7d377fba39147d8820a5a2866a2fbcb73db98
Parents: 03008e0
Author: petermaxlee 
Authored: Thu Jun 30 16:49:59 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 16:50:06 2016 -0700

--
 .../sql/execution/datasources/FileScanRDD.scala | 15 +++-
 .../apache/spark/sql/MetadataCacheSuite.scala   | 88 
 2 files changed, 102 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index f7f68b1..1314c94 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -111,7 +111,20 @@ class FileScanRDD(
   currentFile = files.next()
   logInfo(s"Reading File $currentFile")
   InputFileNameHolder.setInputFileName(currentFile.filePath)
-  currentIterator = readFunction(currentFile)
+
+  try {
+currentIterator = readFunction(currentFile)
+  } catch {
+case e: java.io.FileNotFoundException =>
+  throw new java.io.FileNotFoundException(
+e.getMessage + "\n" +
+  "It is possible the underlying files have been updated. " +
+  "You can explicitly invalidate the cache in Spark by " +
+  "running 'REFRESH TABLE tableName' command in SQL or " +
+  "by recreating the Dataset/DataFrame involved."
+  )
+  }
+
   hasNext
 } else {
   currentFile = null

http://git-wip-us.apache.org/repos/asf/spark/blob/4dc7d377/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
new file mode 100644
index 000..d872f4b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+
+  /** Removes one data file in the given directory. */
+  private def deleteOneFileInDirectory(dir: File): Unit = {
+assert(dir.isDirectory)
+val oneFile = dir.listFiles().find { file =>
+  !file.getName.startsWith("_") && !file.getName.startsWith(".")
+}
+assert(oneFile.isDefined)
+oneFile.foreach(_.delete())
+  }
+
+  

spark git commit: [SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 5d00a7bc1 -> fb41670c9


[SPARK-16336][SQL] Suggest doing table refresh upon FileNotFoundException

## What changes were proposed in this pull request?
This patch appends a message to suggest users running refresh table or 
reloading data frames when Spark sees a FileNotFoundException due to stale, 
cached metadata.

## How was this patch tested?
Added a unit test for this in MetadataCacheSuite.

Author: petermaxlee 

Closes #14003 from petermaxlee/SPARK-16336.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fb41670c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fb41670c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fb41670c

Branch: refs/heads/master
Commit: fb41670c9263a89ec233861cc91a19cf1bb19073
Parents: 5d00a7b
Author: petermaxlee 
Authored: Thu Jun 30 16:49:59 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 16:49:59 2016 -0700

--
 .../sql/execution/datasources/FileScanRDD.scala | 15 +++-
 .../apache/spark/sql/MetadataCacheSuite.scala   | 88 
 2 files changed, 102 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
index 1443057..c66da3a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileScanRDD.scala
@@ -117,7 +117,20 @@ class FileScanRDD(
   currentFile = files.next()
   logInfo(s"Reading File $currentFile")
   InputFileNameHolder.setInputFileName(currentFile.filePath)
-  currentIterator = readFunction(currentFile)
+
+  try {
+currentIterator = readFunction(currentFile)
+  } catch {
+case e: java.io.FileNotFoundException =>
+  throw new java.io.FileNotFoundException(
+e.getMessage + "\n" +
+  "It is possible the underlying files have been updated. " +
+  "You can explicitly invalidate the cache in Spark by " +
+  "running 'REFRESH TABLE tableName' command in SQL or " +
+  "by recreating the Dataset/DataFrame involved."
+  )
+  }
+
   hasNext
 } else {
   currentFile = null

http://git-wip-us.apache.org/repos/asf/spark/blob/fb41670c/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
new file mode 100644
index 000..d872f4b
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/MetadataCacheSuite.scala
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.io.File
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.test.SharedSQLContext
+
+/**
+ * Test suite to handle metadata cache related.
+ */
+class MetadataCacheSuite extends QueryTest with SharedSQLContext {
+
+  /** Removes one data file in the given directory. */
+  private def deleteOneFileInDirectory(dir: File): Unit = {
+assert(dir.isDirectory)
+val oneFile = dir.listFiles().find { file =>
+  !file.getName.startsWith("_") && !file.getName.startsWith(".")
+}
+assert(oneFile.isDefined)
+oneFile.foreach(_.delete())
+  }
+
+  test("SPARK-16336 Suggest doing table refresh when encountering 
FileNotFoundException") {
+withTempPath { (location: File) =>
+   

spark git commit: [SPARK-16256][DOCS] Fix window operation diagram

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 f17ffef38 -> 03008e049


[SPARK-16256][DOCS] Fix window operation diagram

Author: Tathagata Das 

Closes #14001 from tdas/SPARK-16256-2.

(cherry picked from commit 5d00a7bc19ddeb1b5247733b55095a03ee7b1a30)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/03008e04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/03008e04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/03008e04

Branch: refs/heads/branch-2.0
Commit: 03008e049a366bc7a63b3915b42ee50320ac6f34
Parents: f17ffef
Author: Tathagata Das 
Authored: Thu Jun 30 14:01:34 2016 -0700
Committer: Tathagata Das 
Committed: Thu Jun 30 14:01:56 2016 -0700

--
 docs/img/structured-streaming-late-data.png| Bin 138931 -> 138226 bytes
 docs/img/structured-streaming-window.png   | Bin 128930 -> 132875 bytes
 docs/img/structured-streaming.pptx | Bin 1105315 -> 1105413 bytes
 docs/structured-streaming-programming-guide.md |   2 +-
 4 files changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming-late-data.png
--
diff --git a/docs/img/structured-streaming-late-data.png 
b/docs/img/structured-streaming-late-data.png
index 5276b47..2283f67 100644
Binary files a/docs/img/structured-streaming-late-data.png and 
b/docs/img/structured-streaming-late-data.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming-window.png
--
diff --git a/docs/img/structured-streaming-window.png 
b/docs/img/structured-streaming-window.png
index be9d3fb..c1842b1 100644
Binary files a/docs/img/structured-streaming-window.png and 
b/docs/img/structured-streaming-window.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/img/structured-streaming.pptx
--
diff --git a/docs/img/structured-streaming.pptx 
b/docs/img/structured-streaming.pptx
index c278323..6aad2ed 100644
Binary files a/docs/img/structured-streaming.pptx and 
b/docs/img/structured-streaming.pptx differ

http://git-wip-us.apache.org/repos/asf/spark/blob/03008e04/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 5932566..7949396 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -620,7 +620,7 @@ df.groupBy("type").count()
 ### Window Operations on Event Time
 Aggregations over a sliding event-time window are straightforward with 
Structured Streaming. The key idea to understand about window-based 
aggregations are very similar to grouped aggregations. In a grouped 
aggregation, aggregate values (e.g. counts) are maintained for each unique 
value in the user-specified grouping column. In case of window-based 
aggregations, aggregate values are maintained for each window the event-time of 
a row falls into. Let's understand this with an illustration. 
 
-Imagine the quick example is modified and the stream contains lines along with 
the time when the line was generated. Instead of running word counts, we want 
to count words within 10 minute windows, updating every 5 minutes. That is, 
word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 
12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived 
after 12:00 but before 12:10. Now, consider a word that was received at 12:07. 
This word should increment the counts corresponding to two windows 12:00 - 
12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping 
key (i.e. the word) and the window (can be calculated from the event-time).
+Imagine our quick example is modified and the stream now contains lines along 
with the time when the line was generated. Instead of running word counts, we 
want to count words within 10 minute windows, updating every 5 minutes. That 
is, word counts in words received between 10 minute windows 12:00 - 12:10, 
12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that 
arrived after 12:00 but before 12:10. Now, consider a word that was received at 
12:07. This word should increment the counts corresponding to two windows 12:00 
- 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping 
key (i.e. the word) and the window (can be 

spark git commit: [SPARK-16256][DOCS] Fix window operation diagram

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master c62263340 -> 5d00a7bc1


[SPARK-16256][DOCS] Fix window operation diagram

Author: Tathagata Das 

Closes #14001 from tdas/SPARK-16256-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d00a7bc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d00a7bc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d00a7bc

Branch: refs/heads/master
Commit: 5d00a7bc19ddeb1b5247733b55095a03ee7b1a30
Parents: c622633
Author: Tathagata Das 
Authored: Thu Jun 30 14:01:34 2016 -0700
Committer: Tathagata Das 
Committed: Thu Jun 30 14:01:34 2016 -0700

--
 docs/img/structured-streaming-late-data.png| Bin 138931 -> 138226 bytes
 docs/img/structured-streaming-window.png   | Bin 128930 -> 132875 bytes
 docs/img/structured-streaming.pptx | Bin 1105315 -> 1105413 bytes
 docs/structured-streaming-programming-guide.md |   2 +-
 4 files changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming-late-data.png
--
diff --git a/docs/img/structured-streaming-late-data.png 
b/docs/img/structured-streaming-late-data.png
index 5276b47..2283f67 100644
Binary files a/docs/img/structured-streaming-late-data.png and 
b/docs/img/structured-streaming-late-data.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming-window.png
--
diff --git a/docs/img/structured-streaming-window.png 
b/docs/img/structured-streaming-window.png
index be9d3fb..c1842b1 100644
Binary files a/docs/img/structured-streaming-window.png and 
b/docs/img/structured-streaming-window.png differ

http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/img/structured-streaming.pptx
--
diff --git a/docs/img/structured-streaming.pptx 
b/docs/img/structured-streaming.pptx
index c278323..6aad2ed 100644
Binary files a/docs/img/structured-streaming.pptx and 
b/docs/img/structured-streaming.pptx differ

http://git-wip-us.apache.org/repos/asf/spark/blob/5d00a7bc/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 5932566..7949396 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -620,7 +620,7 @@ df.groupBy("type").count()
 ### Window Operations on Event Time
 Aggregations over a sliding event-time window are straightforward with 
Structured Streaming. The key idea to understand about window-based 
aggregations are very similar to grouped aggregations. In a grouped 
aggregation, aggregate values (e.g. counts) are maintained for each unique 
value in the user-specified grouping column. In case of window-based 
aggregations, aggregate values are maintained for each window the event-time of 
a row falls into. Let's understand this with an illustration. 
 
-Imagine the quick example is modified and the stream contains lines along with 
the time when the line was generated. Instead of running word counts, we want 
to count words within 10 minute windows, updating every 5 minutes. That is, 
word counts in words received between 10 minute windows 12:00 - 12:10, 12:05 - 
12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that arrived 
after 12:00 but before 12:10. Now, consider a word that was received at 12:07. 
This word should increment the counts corresponding to two windows 12:00 - 
12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping 
key (i.e. the word) and the window (can be calculated from the event-time).
+Imagine our quick example is modified and the stream now contains lines along 
with the time when the line was generated. Instead of running word counts, we 
want to count words within 10 minute windows, updating every 5 minutes. That 
is, word counts in words received between 10 minute windows 12:00 - 12:10, 
12:05 - 12:15, 12:10 - 12:20, etc. Note that 12:00 - 12:10 means data that 
arrived after 12:00 but before 12:10. Now, consider a word that was received at 
12:07. This word should increment the counts corresponding to two windows 12:00 
- 12:10 and 12:05 - 12:15. So the counts will be indexed by both, the grouping 
key (i.e. the word) and the window (can be calculated from the event-time).
 
 The result tables would look something like the following.
 



spark git commit: [SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master 46395db80 -> c62263340


[SPARK-16212][STREAMING][KAFKA] code cleanup from review feedback

## What changes were proposed in this pull request?
code cleanup in kafka-0-8 to match suggested changes for kafka-0-10 branch

## How was this patch tested?
unit tests

Author: cody koeninger 

Closes #13908 from koeninger/kafka-0-8-cleanup.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c6226334
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c6226334
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c6226334

Branch: refs/heads/master
Commit: c62263340edb6976a10f274e716fde6cd2c5bf34
Parents: 46395db
Author: cody koeninger 
Authored: Thu Jun 30 13:16:58 2016 -0700
Committer: Tathagata Das 
Committed: Thu Jun 30 13:16:58 2016 -0700

--
 .../spark/streaming/kafka/DirectKafkaInputDStream.scala | 12 ++--
 .../org/apache/spark/streaming/kafka/KafkaRDD.scala |  9 ++---
 .../streaming/kafka/JavaDirectKafkaStreamSuite.java |  5 -
 3 files changed, 12 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
--
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
index fb58ed7..c3c7993 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/DirectKafkaInputDStream.scala
@@ -34,7 +34,7 @@ import org.apache.spark.streaming.scheduler.{RateController, 
StreamInputInfo}
 import org.apache.spark.streaming.scheduler.rate.RateEstimator
 
 /**
- *  A stream of {@link org.apache.spark.streaming.kafka.KafkaRDD} where
+ *  A stream of [[KafkaRDD]] where
  * each given Kafka topic/partition corresponds to an RDD partition.
  * The spark configuration spark.streaming.kafka.maxRatePerPartition gives the 
maximum number
  *  of messages
@@ -43,7 +43,7 @@ import org.apache.spark.streaming.scheduler.rate.RateEstimator
  * and this DStream is not responsible for committing offsets,
  * so that you can control exactly-once semantics.
  * For an easy interface to Kafka-managed offsets,
- *  see {@link org.apache.spark.streaming.kafka.KafkaCluster}
+ *  see [[KafkaCluster]]
  * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration;>
  * configuration parameters.
  *   Requires "metadata.broker.list" or "bootstrap.servers" to be set with 
Kafka broker(s),
@@ -132,7 +132,7 @@ class DirectKafkaInputDStream[
   if (retries <= 0) {
 throw new SparkException(err)
   } else {
-log.error(err)
+logError(err)
 Thread.sleep(kc.config.refreshLeaderBackoffMs)
 latestLeaderOffsets(retries - 1)
   }
@@ -194,7 +194,7 @@ class DirectKafkaInputDStream[
   data.asInstanceOf[mutable.HashMap[Time, 
Array[OffsetRange.OffsetRangeTuple]]]
 }
 
-override def update(time: Time) {
+override def update(time: Time): Unit = {
   batchForTime.clear()
   generatedRDDs.foreach { kv =>
 val a = kv._2.asInstanceOf[KafkaRDD[K, V, U, T, 
R]].offsetRanges.map(_.toTuple).toArray
@@ -202,9 +202,9 @@ class DirectKafkaInputDStream[
   }
 }
 
-override def cleanup(time: Time) { }
+override def cleanup(time: Time): Unit = { }
 
-override def restore() {
+override def restore(): Unit = {
   // this is assuming that the topics don't change during execution, which 
is true currently
   val topics = fromOffsets.keySet
   val leaders = KafkaCluster.checkErrors(kc.findLeaders(topics))

http://git-wip-us.apache.org/repos/asf/spark/blob/c6226334/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
--
diff --git 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
index d4881b1..2b92577 100644
--- 
a/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
+++ 
b/external/kafka-0-8/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala
@@ -129,7 +129,7 @@ class KafkaRDD[
 val part = thePart.asInstanceOf[KafkaRDDPartition]
 assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
 if (part.fromOffset == part.untilOffset) {
-  

spark git commit: [SPARK-16289][SQL] Implement posexplode table generating function

2016-06-30 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master fdf9f94f8 -> 46395db80


[SPARK-16289][SQL] Implement posexplode table generating function

## What changes were proposed in this pull request?

This PR implements `posexplode` table generating function. Currently, master 
branch raises the following exception for `map` argument. It's different from 
Hive.

**Before**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
org.apache.spark.sql.AnalysisException: No handler for Hive UDF ... 
posexplode() takes an array as a parameter; line 1 pos 7
```

**After**
```scala
scala> sql("select posexplode(map('a', 1, 'b', 2))").show
+---+---+-+
|pos|key|value|
+---+---+-+
|  0|  a|1|
|  1|  b|2|
+---+---+-+
```

For `array` argument, `after` is the same with `before`.
```
scala> sql("select posexplode(array(1, 2, 3))").show
+---+---+
|pos|col|
+---+---+
|  0|  1|
|  1|  2|
|  2|  3|
+---+---+
```

## How was this patch tested?

Pass the Jenkins tests with newly added testcases.

Author: Dongjoon Hyun 

Closes #13971 from dongjoon-hyun/SPARK-16289.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/46395db8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/46395db8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/46395db8

Branch: refs/heads/master
Commit: 46395db80e3304e3f3a1ebdc8aadb8f2819b48b4
Parents: fdf9f94
Author: Dongjoon Hyun 
Authored: Thu Jun 30 12:03:54 2016 -0700
Committer: Reynold Xin 
Committed: Thu Jun 30 12:03:54 2016 -0700

--
 R/pkg/NAMESPACE |  1 +
 R/pkg/R/functions.R | 17 
 R/pkg/R/generics.R  |  4 +
 R/pkg/inst/tests/testthat/test_sparkSQL.R   |  2 +-
 python/pyspark/sql/functions.py | 21 +
 .../catalyst/analysis/FunctionRegistry.scala|  1 +
 .../sql/catalyst/expressions/generators.scala   | 66 +++---
 .../analysis/ExpressionTypeCheckingSuite.scala  |  2 +
 .../expressions/GeneratorExpressionSuite.scala  | 71 +++
 .../scala/org/apache/spark/sql/Column.scala |  1 +
 .../scala/org/apache/spark/sql/functions.scala  |  8 ++
 .../spark/sql/ColumnExpressionSuite.scala   | 60 -
 .../spark/sql/GeneratorFunctionSuite.scala  | 92 
 .../spark/sql/hive/HiveSessionCatalog.scala |  2 +-
 14 files changed, 276 insertions(+), 72 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/NAMESPACE
--
diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE
index e0ffde9..abc6588 100644
--- a/R/pkg/NAMESPACE
+++ b/R/pkg/NAMESPACE
@@ -234,6 +234,7 @@ exportMethods("%in%",
   "over",
   "percent_rank",
   "pmod",
+  "posexplode",
   "quarter",
   "rand",
   "randn",

http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/R/functions.R
--
diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R
index 09e5afa..52d46f9 100644
--- a/R/pkg/R/functions.R
+++ b/R/pkg/R/functions.R
@@ -2934,3 +2934,20 @@ setMethod("sort_array",
 jc <- callJStatic("org.apache.spark.sql.functions", "sort_array", 
x@jc, asc)
 column(jc)
   })
+
+#' posexplode
+#'
+#' Creates a new row for each element with position in the given array or map 
column.
+#'
+#' @rdname posexplode
+#' @name posexplode
+#' @family collection_funcs
+#' @export
+#' @examples \dontrun{posexplode(df$c)}
+#' @note posexplode since 2.1.0
+setMethod("posexplode",
+  signature(x = "Column"),
+  function(x) {
+jc <- callJStatic("org.apache.spark.sql.functions", "posexplode", 
x@jc)
+column(jc)
+  })

http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/R/generics.R
--
diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R
index 0e4350f..d9080b6 100644
--- a/R/pkg/R/generics.R
+++ b/R/pkg/R/generics.R
@@ -1050,6 +1050,10 @@ setGeneric("percent_rank", function(x) { 
standardGeneric("percent_rank") })
 #' @export
 setGeneric("pmod", function(y, x) { standardGeneric("pmod") })
 
+#' @rdname posexplode
+#' @export
+setGeneric("posexplode", function(x) { standardGeneric("posexplode") })
+
 #' @rdname quarter
 #' @export
 setGeneric("quarter", function(x) { standardGeneric("quarter") })

http://git-wip-us.apache.org/repos/asf/spark/blob/46395db8/R/pkg/inst/tests/testthat/test_sparkSQL.R
--
diff --git 

spark git commit: [SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 executors

2016-06-30 Thread irashid
Repository: spark
Updated Branches:
  refs/heads/master 07f46afc7 -> fdf9f94f8


[SPARK-15865][CORE] Blacklist should not result in job hanging with less than 4 
executors

## What changes were proposed in this pull request?

Before this change, when you turn on blacklisting with 
`spark.scheduler.executorTaskBlacklistTime`, but you have fewer than 
`spark.task.maxFailures` executors, you can end with a job "hung" after some 
task failures.

Whenever a taskset is unable to schedule anything on 
resourceOfferSingleTaskSet, we check whether the last pending task can be 
scheduled on *any* known executor.  If not, the taskset (and any corresponding 
jobs) are failed.
* Worst case, this is O(maxTaskFailures + numTasks).  But unless many executors 
are bad, this should be small
* This does not fail as fast as possible -- when a task becomes unschedulable, 
we keep scheduling other tasks.  This is to avoid an O(numPendingTasks * 
numExecutors) operation
* Also, it is conceivable this fails too quickly.  You may be 1 millisecond 
away from unblacklisting a place for a task to run, or acquiring a new executor.

## How was this patch tested?

Added unit test which failed before the change, ran new test 5k times manually, 
ran all scheduler tests manually, and the full suite via jenkins.

Author: Imran Rashid 

Closes #13603 from squito/progress_w_few_execs_and_blacklist.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/fdf9f94f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/fdf9f94f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/fdf9f94f

Branch: refs/heads/master
Commit: fdf9f94f8c8861a00cd8415073f842b857c397f7
Parents: 07f46af
Author: Imran Rashid 
Authored: Thu Jun 30 13:36:06 2016 -0500
Committer: Imran Rashid 
Committed: Thu Jun 30 13:36:06 2016 -0500

--
 .../org/apache/spark/scheduler/TaskInfo.scala   |  4 +
 .../spark/scheduler/TaskSchedulerImpl.scala |  3 +
 .../apache/spark/scheduler/TaskSetManager.scala | 54 +++-
 .../apache/spark/executor/ExecutorSuite.scala   |  2 +-
 .../scheduler/BlacklistIntegrationSuite.scala   | 33 ++-
 .../org/apache/spark/scheduler/FakeTask.scala   |  5 +-
 .../org/apache/spark/scheduler/PoolSuite.scala  |  2 +-
 .../scheduler/SchedulerIntegrationSuite.scala   | 14 +--
 .../scheduler/TaskSchedulerImplSuite.scala  | 92 
 9 files changed, 194 insertions(+), 15 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
index 2d89232..eeb7963 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskInfo.scala
@@ -30,6 +30,10 @@ import org.apache.spark.annotation.DeveloperApi
 @DeveloperApi
 class TaskInfo(
 val taskId: Long,
+/**
+ * The index of this task within its task set. Not necessarily the same as 
the ID of the RDD
+ * partition that the task is computing.
+ */
 val index: Int,
 val attemptNumber: Int,
 val launchTime: Long,

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 821e3ee..2ce49ca 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -279,6 +279,9 @@ private[spark] class TaskSchedulerImpl(
 }
   }
 }
+if (!launchedTask) {
+  taskSet.abortIfCompletelyBlacklisted(executorIdToHost.keys)
+}
 return launchedTask
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/fdf9f94f/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
index 2eedd20..2fef447 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSetManager.scala
@@ -83,7 +83,7 @@ private[spark] class TaskSetManager(
   val copiesRunning = new Array[Int](numTasks)
   val successful = new Array[Boolean](numTasks)
   private val 

spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…

2016-06-30 Thread davies
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 98056a1f8 -> f17ffef38


[SPARK-13850] Force the sorter to Spill when number of elements in th…

Force the sorter to Spill when number of elements in the pointer array reach a 
certain size. This is to workaround the issue of timSort failing on large 
buffer size.

Tested by running a job which was failing without this change due to TimSort 
bug.

Author: Sital Kedia 

Closes #13107 from sitalkedia/fix_TimSort.

(cherry picked from commit 07f46afc733b1718d528a6ea5c0d774f047024fa)
Signed-off-by: Davies Liu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f17ffef3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f17ffef3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f17ffef3

Branch: refs/heads/branch-2.0
Commit: f17ffef38b4749b6b801c198ec207434a4db0c38
Parents: 98056a1
Author: Sital Kedia 
Authored: Thu Jun 30 10:53:18 2016 -0700
Committer: Davies Liu 
Committed: Thu Jun 30 10:54:37 2016 -0700

--
 .../shuffle/sort/ShuffleExternalSorter.java | 10 ++---
 .../unsafe/sort/UnsafeExternalSorter.java   | 23 +---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  3 +++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 ++
 .../UnsafeFixedWidthAggregationMap.java |  3 +++
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +--
 .../apache/spark/sql/execution/WindowExec.scala |  2 ++
 .../execution/datasources/WriterContainer.scala |  5 -
 .../execution/joins/CartesianProductExec.scala  |  2 ++
 .../execution/streaming/FileStreamSink.scala|  5 -
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 +++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  5 -
 12 files changed, 60 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef8..696ee73 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final TaskContext taskContext;
   private final ShuffleWriteMetrics writeMetrics;
 
-  /** Force this sorter to spill when there are this many elements in memory. 
For testing only */
+  /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   */
   private final long numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
 this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 this.numElementsForSpillThreshold =
-  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 
Long.MAX_VALUE);
+  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
 this.writeMetrics = writeMetrics;
 this.inMemSorter = new ShuffleInMemorySorter(
   this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
 // for tests
 assert(inMemSorter != null);
-if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+  logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
   spill();
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/f17ffef3/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index e14a23f..8a980d4 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 

spark git commit: [SPARK-13850] Force the sorter to Spill when number of elements in th…

2016-06-30 Thread davies
Repository: spark
Updated Branches:
  refs/heads/master 5344bade8 -> 07f46afc7


[SPARK-13850] Force the sorter to Spill when number of elements in th…

## What changes were proposed in this pull request?

Force the sorter to Spill when number of elements in the pointer array reach a 
certain size. This is to workaround the issue of timSort failing on large 
buffer size.

## How was this patch tested?

Tested by running a job which was failing without this change due to TimSort 
bug.

Author: Sital Kedia 

Closes #13107 from sitalkedia/fix_TimSort.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/07f46afc
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/07f46afc
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/07f46afc

Branch: refs/heads/master
Commit: 07f46afc733b1718d528a6ea5c0d774f047024fa
Parents: 5344bad
Author: Sital Kedia 
Authored: Thu Jun 30 10:53:18 2016 -0700
Committer: Davies Liu 
Committed: Thu Jun 30 10:53:18 2016 -0700

--
 .../shuffle/sort/ShuffleExternalSorter.java | 10 ++---
 .../unsafe/sort/UnsafeExternalSorter.java   | 23 +---
 .../unsafe/sort/UnsafeExternalSorterSuite.java  |  3 +++
 .../sql/execution/UnsafeExternalRowSorter.java  |  2 ++
 .../UnsafeFixedWidthAggregationMap.java |  3 +++
 .../sql/execution/UnsafeKVExternalSorter.java   |  8 +--
 .../apache/spark/sql/execution/WindowExec.scala |  2 ++
 .../execution/datasources/WriterContainer.scala |  5 -
 .../execution/joins/CartesianProductExec.scala  |  2 ++
 .../execution/streaming/FileStreamSink.scala|  5 -
 .../execution/UnsafeKVExternalSorterSuite.scala |  4 +++-
 .../spark/sql/hive/hiveWriterContainers.scala   |  5 -
 12 files changed, 60 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
index 014aef8..696ee73 100644
--- 
a/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/shuffle/sort/ShuffleExternalSorter.java
@@ -72,7 +72,10 @@ final class ShuffleExternalSorter extends MemoryConsumer {
   private final TaskContext taskContext;
   private final ShuffleWriteMetrics writeMetrics;
 
-  /** Force this sorter to spill when there are this many elements in memory. 
For testing only */
+  /**
+   * Force this sorter to spill when there are this many elements in memory. 
The default value is
+   * 1024 * 1024 * 1024, which allows the maximum size of the pointer array to 
be 8G.
+   */
   private final long numElementsForSpillThreshold;
 
   /** The buffer size to use when writing spills using DiskBlockObjectWriter */
@@ -114,7 +117,7 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 // Use getSizeAsKb (not bytes) to maintain backwards compatibility if no 
units are provided
 this.fileBufferSizeBytes = (int) 
conf.getSizeAsKb("spark.shuffle.file.buffer", "32k") * 1024;
 this.numElementsForSpillThreshold =
-  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 
Long.MAX_VALUE);
+  conf.getLong("spark.shuffle.spill.numElementsForceSpillThreshold", 1024 
* 1024 * 1024);
 this.writeMetrics = writeMetrics;
 this.inMemSorter = new ShuffleInMemorySorter(
   this, initialSize, conf.getBoolean("spark.shuffle.sort.useRadixSort", 
true));
@@ -372,7 +375,8 @@ final class ShuffleExternalSorter extends MemoryConsumer {
 
 // for tests
 assert(inMemSorter != null);
-if (inMemSorter.numRecords() > numElementsForSpillThreshold) {
+if (inMemSorter.numRecords() >= numElementsForSpillThreshold) {
+  logger.info("Spilling data because number of spilledRecords crossed the 
threshold " + numElementsForSpillThreshold);
   spill();
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/07f46afc/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
--
diff --git 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
index ec15f0b..d6a255e 100644
--- 
a/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
+++ 
b/core/src/main/java/org/apache/spark/util/collection/unsafe/sort/UnsafeExternalSorter.java
@@ -27,6 +27,7 @@ import 

spark git commit: [SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API

2016-06-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master 5320adc86 -> 5344bade8


[SPARK-15820][PYSPARK][SQL] Add Catalog.refreshTable into python API

## What changes were proposed in this pull request?

Add Catalog.refreshTable API into python interface for Spark-SQL.

## How was this patch tested?

Existing test.

Author: WeichenXu 

Closes #13558 from WeichenXu123/update_python_sql_interface_refreshTable.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5344bade
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5344bade
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5344bade

Branch: refs/heads/master
Commit: 5344bade8efb6f12aa43fbfbbbc2e3c0c7d16d98
Parents: 5320adc
Author: WeichenXu 
Authored: Thu Jun 30 23:00:39 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 23:00:39 2016 +0800

--
 python/pyspark/sql/catalog.py   | 5 +
 .../src/main/scala/org/apache/spark/sql/catalog/Catalog.scala   | 2 +-
 2 files changed, 6 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5344bade/python/pyspark/sql/catalog.py
--
diff --git a/python/pyspark/sql/catalog.py b/python/pyspark/sql/catalog.py
index 3033f14..4af930a 100644
--- a/python/pyspark/sql/catalog.py
+++ b/python/pyspark/sql/catalog.py
@@ -232,6 +232,11 @@ class Catalog(object):
 """Removes all cached tables from the in-memory cache."""
 self._jcatalog.clearCache()
 
+@since(2.0)
+def refreshTable(self, tableName):
+"""Invalidate and refresh all the cached metadata of the given 
table."""
+self._jcatalog.refreshTable(tableName)
+
 def _reset(self):
 """(Internal use only) Drop all existing databases (except "default"), 
tables,
 partitions and functions, and set the current database to "default".

http://git-wip-us.apache.org/repos/asf/spark/blob/5344bade/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
--
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
index 083a63c..91ed9b3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/catalog/Catalog.scala
@@ -214,7 +214,7 @@ abstract class Catalog {
   def clearCache(): Unit
 
   /**
-   * Invalidate and refresh all the cached the metadata of the given table. 
For performance reasons,
+   * Invalidate and refresh all the cached metadata of the given table. For 
performance reasons,
* Spark SQL or the external data source library it uses might cache certain 
metadata about a
* table, such as the location of blocks. When those change outside of Spark 
SQL, users should
* call this function to invalidate the cache.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [BUILD] Fix version in poms related to kafka-0-10

2016-06-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 56207fc3b -> 98056a1f8


[BUILD] Fix version in poms related to kafka-0-10

self explanatory

Author: Tathagata Das 

Closes #13994 from tdas/SPARK-12177-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/98056a1f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/98056a1f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/98056a1f

Branch: refs/heads/branch-2.0
Commit: 98056a1f8683385599f194a4b963769e3342bff3
Parents: 56207fc
Author: Tathagata Das 
Authored: Thu Jun 30 22:10:56 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 22:10:56 2016 +0800

--
 external/kafka-0-10-assembly/pom.xml | 2 +-
 external/kafka-0-10/pom.xml  | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/98056a1f/external/kafka-0-10-assembly/pom.xml
--
diff --git a/external/kafka-0-10-assembly/pom.xml 
b/external/kafka-0-10-assembly/pom.xml
index f2468d1..59f41f1 100644
--- a/external/kafka-0-10-assembly/pom.xml
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.0-SNAPSHOT
+2.0.1-SNAPSHOT
 ../../pom.xml
   
 

http://git-wip-us.apache.org/repos/asf/spark/blob/98056a1f/external/kafka-0-10/pom.xml
--
diff --git a/external/kafka-0-10/pom.xml b/external/kafka-0-10/pom.xml
index 50395f6..2696561 100644
--- a/external/kafka-0-10/pom.xml
+++ b/external/kafka-0-10/pom.xml
@@ -21,7 +21,7 @@
   
 org.apache.spark
 spark-parent_2.11
-2.0.0-SNAPSHOT
+2.0.1-SNAPSHOT
 ../../pom.xml
   
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder

2016-06-30 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 6a4f4c1d7 -> 56207fc3b


[SPARK-16071][SQL] Checks size limit when doubling the array size in 
BufferHolder

## What changes were proposed in this pull request?

This PR Checks the size limit when doubling the array size in BufferHolder to 
avoid integer overflow.

## How was this patch tested?

Manual test.

Author: Sean Zhong 

Closes #13829 from clockfly/SPARK-16071_2.

(cherry picked from commit 5320adc863ca85b489cef79f156392b9da36e53f)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/56207fc3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/56207fc3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/56207fc3

Branch: refs/heads/branch-2.0
Commit: 56207fc3b26cdb8cb50ce460eeab32c06a81bb44
Parents: 6a4f4c1
Author: Sean Zhong 
Authored: Thu Jun 30 21:56:34 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 30 21:56:55 2016 +0800

--
 .../expressions/codegen/BufferHolder.java   | 16 +++-
 .../expressions/codegen/BufferHolderSuite.scala | 39 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/56207fc3/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index af61e20..0e4264f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -45,7 +45,13 @@ public class BufferHolder {
   }
 
   public BufferHolder(UnsafeRow row, int initialSize) {
-this.fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 
8 * row.numFields();
+int bitsetWidthInBytes = 
UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
+if (row.numFields() > (Integer.MAX_VALUE - initialSize - 
bitsetWidthInBytes) / 8) {
+  throw new UnsupportedOperationException(
+"Cannot create BufferHolder for input UnsafeRow because there are " +
+  "too many fields (number of fields: " + row.numFields() + ")");
+}
+this.fixedSize = bitsetWidthInBytes + 8 * row.numFields();
 this.buffer = new byte[fixedSize + initialSize];
 this.row = row;
 this.row.pointTo(buffer, buffer.length);
@@ -55,10 +61,16 @@ public class BufferHolder {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
   public void grow(int neededSize) {
+if (neededSize > Integer.MAX_VALUE - totalSize()) {
+  throw new UnsupportedOperationException(
+"Cannot grow BufferHolder by size " + neededSize + " because the size 
after growing " +
+  "exceeds size limitation " + Integer.MAX_VALUE);
+}
 final int length = totalSize() + neededSize;
 if (buffer.length < length) {
   // This will not happen frequently, because the buffer is re-used.
-  final byte[] tmp = new byte[length * 2];
+  int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : 
Integer.MAX_VALUE;
+  final byte[] tmp = new byte[newLength];
   Platform.copyMemory(
 buffer,
 Platform.BYTE_ARRAY_OFFSET,

http://git-wip-us.apache.org/repos/asf/spark/blob/56207fc3/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
new file mode 100644
index 000..c7c386b
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the 

spark git commit: [SPARK-16071][SQL] Checks size limit when doubling the array size in BufferHolder

2016-06-30 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master de8ab313e -> 5320adc86


[SPARK-16071][SQL] Checks size limit when doubling the array size in 
BufferHolder

## What changes were proposed in this pull request?

This PR Checks the size limit when doubling the array size in BufferHolder to 
avoid integer overflow.

## How was this patch tested?

Manual test.

Author: Sean Zhong 

Closes #13829 from clockfly/SPARK-16071_2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5320adc8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5320adc8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5320adc8

Branch: refs/heads/master
Commit: 5320adc863ca85b489cef79f156392b9da36e53f
Parents: de8ab31
Author: Sean Zhong 
Authored: Thu Jun 30 21:56:34 2016 +0800
Committer: Wenchen Fan 
Committed: Thu Jun 30 21:56:34 2016 +0800

--
 .../expressions/codegen/BufferHolder.java   | 16 +++-
 .../expressions/codegen/BufferHolderSuite.scala | 39 
 2 files changed, 53 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5320adc8/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
--
diff --git 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
index af61e20..0e4264f 100644
--- 
a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
+++ 
b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolder.java
@@ -45,7 +45,13 @@ public class BufferHolder {
   }
 
   public BufferHolder(UnsafeRow row, int initialSize) {
-this.fixedSize = UnsafeRow.calculateBitSetWidthInBytes(row.numFields()) + 
8 * row.numFields();
+int bitsetWidthInBytes = 
UnsafeRow.calculateBitSetWidthInBytes(row.numFields());
+if (row.numFields() > (Integer.MAX_VALUE - initialSize - 
bitsetWidthInBytes) / 8) {
+  throw new UnsupportedOperationException(
+"Cannot create BufferHolder for input UnsafeRow because there are " +
+  "too many fields (number of fields: " + row.numFields() + ")");
+}
+this.fixedSize = bitsetWidthInBytes + 8 * row.numFields();
 this.buffer = new byte[fixedSize + initialSize];
 this.row = row;
 this.row.pointTo(buffer, buffer.length);
@@ -55,10 +61,16 @@ public class BufferHolder {
* Grows the buffer by at least neededSize and points the row to the buffer.
*/
   public void grow(int neededSize) {
+if (neededSize > Integer.MAX_VALUE - totalSize()) {
+  throw new UnsupportedOperationException(
+"Cannot grow BufferHolder by size " + neededSize + " because the size 
after growing " +
+  "exceeds size limitation " + Integer.MAX_VALUE);
+}
 final int length = totalSize() + neededSize;
 if (buffer.length < length) {
   // This will not happen frequently, because the buffer is re-used.
-  final byte[] tmp = new byte[length * 2];
+  int newLength = length < Integer.MAX_VALUE / 2 ? length * 2 : 
Integer.MAX_VALUE;
+  final byte[] tmp = new byte[newLength];
   Platform.copyMemory(
 buffer,
 Platform.BYTE_ARRAY_OFFSET,

http://git-wip-us.apache.org/repos/asf/spark/blob/5320adc8/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
new file mode 100644
index 000..c7c386b
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/BufferHolderSuite.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the 

spark git commit: [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10

2016-06-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 1d274455c -> 6a4f4c1d7


[SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10

## What changes were proposed in this pull request?

The commented lines failed scala 2.10 build. This is because of change in 
behavior of case classes between 2.10 and 2.11. In scala 2.10, if companion 
object of a case class has explicitly defined apply(), then the implicit apply 
method is not generated. In scala 2.11 it is generated. Hence, the lines 
compile fine in 2.11 but not in 2.10.

This simply comments the tests to fix broken build. Correct solution is pending.

Author: Tathagata Das 

Closes #13992 from tdas/SPARK-12177.

(cherry picked from commit de8ab313e1fe59f849a62e59349224581ff0b40a)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6a4f4c1d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6a4f4c1d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6a4f4c1d

Branch: refs/heads/branch-2.0
Commit: 6a4f4c1d751db9542ba49755e859b55b42be3236
Parents: 1d27445
Author: Tathagata Das 
Authored: Thu Jun 30 18:06:04 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 18:06:20 2016 +0800

--
 .../spark/streaming/kafka010/JavaConsumerStrategySuite.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6a4f4c1d/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index aba45f5..8d7c05b 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -50,8 +50,8 @@ public class JavaConsumerStrategySuite implements 
Serializable {
   JavaConverters.mapAsScalaMapConverter(offsets).asScala();
 
 // make sure constructors can be called from java
-final ConsumerStrategy sub0 =
-  Subscribe.apply(topics, kafkaParams, offsets);
+// final ConsumerStrategy sub0 =  // does not 
compile in Scala 2.10
+//   Subscribe.apply(topics, kafkaParams, offsets);
 final ConsumerStrategy sub1 =
   Subscribe.apply(sTopics, sKafkaParams, sOffsets);
 final ConsumerStrategy sub2 =
@@ -65,8 +65,8 @@ public class JavaConsumerStrategySuite implements 
Serializable {
   sub1.executorKafkaParams().get("bootstrap.servers"),
   sub3.executorKafkaParams().get("bootstrap.servers"));
 
-final ConsumerStrategy asn0 =
-  Assign.apply(parts, kafkaParams, offsets);
+// final ConsumerStrategy asn0 =  // does not 
compile in Scala 2.10
+//   Assign.apply(parts, kafkaParams, offsets);
 final ConsumerStrategy asn1 =
   Assign.apply(sParts, sKafkaParams, sOffsets);
 final ConsumerStrategy asn2 =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10

2016-06-30 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master b30a2dc7c -> de8ab313e


[SPARK-12177][TEST] Removed test to avoid compilation issue in scala 2.10

## What changes were proposed in this pull request?

The commented lines failed scala 2.10 build. This is because of change in 
behavior of case classes between 2.10 and 2.11. In scala 2.10, if companion 
object of a case class has explicitly defined apply(), then the implicit apply 
method is not generated. In scala 2.11 it is generated. Hence, the lines 
compile fine in 2.11 but not in 2.10.

This simply comments the tests to fix broken build. Correct solution is pending.

Author: Tathagata Das 

Closes #13992 from tdas/SPARK-12177.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/de8ab313
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/de8ab313
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/de8ab313

Branch: refs/heads/master
Commit: de8ab313e1fe59f849a62e59349224581ff0b40a
Parents: b30a2dc
Author: Tathagata Das 
Authored: Thu Jun 30 18:06:04 2016 +0800
Committer: Cheng Lian 
Committed: Thu Jun 30 18:06:04 2016 +0800

--
 .../spark/streaming/kafka010/JavaConsumerStrategySuite.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/de8ab313/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
index aba45f5..8d7c05b 100644
--- 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -50,8 +50,8 @@ public class JavaConsumerStrategySuite implements 
Serializable {
   JavaConverters.mapAsScalaMapConverter(offsets).asScala();
 
 // make sure constructors can be called from java
-final ConsumerStrategy sub0 =
-  Subscribe.apply(topics, kafkaParams, offsets);
+// final ConsumerStrategy sub0 =  // does not 
compile in Scala 2.10
+//   Subscribe.apply(topics, kafkaParams, offsets);
 final ConsumerStrategy sub1 =
   Subscribe.apply(sTopics, sKafkaParams, sOffsets);
 final ConsumerStrategy sub2 =
@@ -65,8 +65,8 @@ public class JavaConsumerStrategySuite implements 
Serializable {
   sub1.executorKafkaParams().get("bootstrap.servers"),
   sub3.executorKafkaParams().get("bootstrap.servers"));
 
-final ConsumerStrategy asn0 =
-  Assign.apply(parts, kafkaParams, offsets);
+// final ConsumerStrategy asn0 =  // does not 
compile in Scala 2.10
+//   Assign.apply(parts, kafkaParams, offsets);
 final ConsumerStrategy asn1 =
   Assign.apply(sParts, sKafkaParams, sOffsets);
 final ConsumerStrategy asn2 =


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes

2016-06-30 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/master 2c3d96134 -> b30a2dc7c


[SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes

## What changes were proposed in this pull request?

model loading backward compatibility for ml NaiveBayes

## How was this patch tested?

existing ut and manual test for loading models saved by Spark 1.6.

Author: zlpmichelle 

Closes #13940 from zlpmichelle/naivebayes.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b30a2dc7
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b30a2dc7
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b30a2dc7

Branch: refs/heads/master
Commit: b30a2dc7c50bfb70bd2b57be70530a9a9fa94a7a
Parents: 2c3d961
Author: zlpmichelle 
Authored: Thu Jun 30 00:50:14 2016 -0700
Committer: Yanbo Liang 
Committed: Thu Jun 30 00:50:14 2016 -0700

--
 .../org/apache/spark/ml/classification/NaiveBayes.scala  | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/b30a2dc7/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 7c34031..c99ae30 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -28,8 +28,9 @@ import org.apache.spark.ml.util._
 import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes}
 import org.apache.spark.mllib.classification.{NaiveBayesModel => 
OldNaiveBayesModel}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
+import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{Dataset, Row}
 
 /**
  * Params for Naive Bayes Classifiers.
@@ -275,9 +276,11 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] 
{
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
 
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath).select("pi", 
"theta").head()
-  val pi = data.getAs[Vector](0)
-  val theta = data.getAs[Matrix](1)
+  val data = sparkSession.read.parquet(dataPath)
+  val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi")
+  val Row(pi: Vector, theta: Matrix) = 
MLUtils.convertMatrixColumnsToML(vecConverted, "theta")
+.select("pi", "theta")
+.head()
   val model = new NaiveBayesModel(metadata.uid, pi, theta)
 
   DefaultParamsReader.getAndSetParams(model, metadata)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes

2016-06-30 Thread yliang
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 c8a7c2305 -> 1d274455c


[SPARK-16241][ML] model loading backward compatibility for ml NaiveBayes

## What changes were proposed in this pull request?

model loading backward compatibility for ml NaiveBayes

## How was this patch tested?

existing ut and manual test for loading models saved by Spark 1.6.

Author: zlpmichelle 

Closes #13940 from zlpmichelle/naivebayes.

(cherry picked from commit b30a2dc7c50bfb70bd2b57be70530a9a9fa94a7a)
Signed-off-by: Yanbo Liang 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1d274455
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1d274455
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1d274455

Branch: refs/heads/branch-2.0
Commit: 1d274455cfa45bc63aee6ecf8dbb1f170ee16af2
Parents: c8a7c23
Author: zlpmichelle 
Authored: Thu Jun 30 00:50:14 2016 -0700
Committer: Yanbo Liang 
Committed: Thu Jun 30 00:50:35 2016 -0700

--
 .../org/apache/spark/ml/classification/NaiveBayes.scala  | 11 +++
 1 file changed, 7 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1d274455/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
--
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala 
b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
index 7c34031..c99ae30 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/classification/NaiveBayes.scala
@@ -28,8 +28,9 @@ import org.apache.spark.ml.util._
 import org.apache.spark.mllib.classification.{NaiveBayes => OldNaiveBayes}
 import org.apache.spark.mllib.classification.{NaiveBayesModel => 
OldNaiveBayesModel}
 import org.apache.spark.mllib.regression.{LabeledPoint => OldLabeledPoint}
+import org.apache.spark.mllib.util.MLUtils
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.Dataset
+import org.apache.spark.sql.{Dataset, Row}
 
 /**
  * Params for Naive Bayes Classifiers.
@@ -275,9 +276,11 @@ object NaiveBayesModel extends MLReadable[NaiveBayesModel] 
{
   val metadata = DefaultParamsReader.loadMetadata(path, sc, className)
 
   val dataPath = new Path(path, "data").toString
-  val data = sparkSession.read.parquet(dataPath).select("pi", 
"theta").head()
-  val pi = data.getAs[Vector](0)
-  val theta = data.getAs[Matrix](1)
+  val data = sparkSession.read.parquet(dataPath)
+  val vecConverted = MLUtils.convertVectorColumnsToML(data, "pi")
+  val Row(pi: Vector, theta: Matrix) = 
MLUtils.convertMatrixColumnsToML(vecConverted, "theta")
+.select("pi", "theta")
+.head()
   val model = new NaiveBayesModel(metadata.uid, pi, theta)
 
   DefaultParamsReader.getAndSetParams(model, metadata)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3.

2016-06-30 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-1.6 1ac830aca -> ccc7fa357


[SPARK-16257][BUILD] Update spark_ec2.py to support Spark 1.6.2 and 1.6.3.

## What changes were proposed in this pull request?

- Adds 1.6.2 and 1.6.3 as supported Spark versions within the bundled spark-ec2 
script.
- Makes the default Spark version 1.6.3 to keep in sync with the upcoming 
release.
- Does not touch the newer spark-ec2 scripts in the separate amplabs repository.

## How was this patch tested?

- Manual script execution:

export AWS_SECRET_ACCESS_KEY=_snip_
export AWS_ACCESS_KEY_ID=_snip_
$SPARK_HOME/ec2/spark-ec2 \
--key-pair=_snip_ \
--identity-file=_snip_ \
--region=us-east-1 \
--vpc-id=_snip_ \
--slaves=1 \
--instance-type=t1.micro \
--spark-version=1.6.2 \
--hadoop-major-version=yarn \
launch test-cluster

- Result: Successful creation of a 1.6.2-based Spark cluster.

This contribution is my original work and I license the work to the project 
under the project's open source license.

Author: Brian Uri 

Closes #13947 from briuri/branch-1.6-bug-spark-16257.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ccc7fa35
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ccc7fa35
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ccc7fa35

Branch: refs/heads/branch-1.6
Commit: ccc7fa357099e0f621cfc02448ba20d3f6fabc14
Parents: 1ac830a
Author: Brian Uri 
Authored: Thu Jun 30 07:52:28 2016 +0100
Committer: Sean Owen 
Committed: Thu Jun 30 07:52:28 2016 +0100

--
 ec2/spark_ec2.py | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ccc7fa35/ec2/spark_ec2.py
--
diff --git a/ec2/spark_ec2.py b/ec2/spark_ec2.py
index 76c09f0..b28b4c5 100755
--- a/ec2/spark_ec2.py
+++ b/ec2/spark_ec2.py
@@ -51,7 +51,7 @@ else:
 raw_input = input
 xrange = range
 
-SPARK_EC2_VERSION = "1.6.1"
+SPARK_EC2_VERSION = "1.6.3"
 SPARK_EC2_DIR = os.path.dirname(os.path.realpath(__file__))
 
 VALID_SPARK_VERSIONS = set([
@@ -77,6 +77,8 @@ VALID_SPARK_VERSIONS = set([
 "1.5.2",
 "1.6.0",
 "1.6.1",
+"1.6.2",
+"1.6.3",
 ])
 
 SPARK_TACHYON_MAP = {
@@ -96,6 +98,8 @@ SPARK_TACHYON_MAP = {
 "1.5.2": "0.7.1",
 "1.6.0": "0.8.2",
 "1.6.1": "0.8.2",
+"1.6.2": "0.8.2",
+"1.6.3": "0.8.2",
 }
 
 DEFAULT_SPARK_VERSION = SPARK_EC2_VERSION
@@ -103,7 +107,7 @@ DEFAULT_SPARK_GITHUB_REPO = 
"https://github.com/apache/spark;
 
 # Default location to get the spark-ec2 scripts (and ami-list) from
 DEFAULT_SPARK_EC2_GITHUB_REPO = "https://github.com/amplab/spark-ec2;
-DEFAULT_SPARK_EC2_BRANCH = "branch-1.5"
+DEFAULT_SPARK_EC2_BRANCH = "branch-1.6"
 
 
 def setup_external_libs(libs):


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 3134f116a -> c8a7c2305


[SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide

Author: Tathagata Das 

Closes #13978 from tdas/SPARK-16256-1.

(cherry picked from commit 2c3d96134dcc0428983eea087db7e91072215aea)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8a7c230
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8a7c230
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8a7c230

Branch: refs/heads/branch-2.0
Commit: c8a7c23054209db5474d96de2a7e2d8a6f8cc0da
Parents: 3134f11
Author: Tathagata Das 
Authored: Wed Jun 29 23:38:19 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jun 29 23:38:35 2016 -0700

--
 docs/structured-streaming-programming-guide.md | 44 +++--
 1 file changed, 23 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8a7c230/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 9ed06be..5932566 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -459,7 +459,7 @@ val csvDF = spark
 .readStream
 .option("sep", ";")
 .schema(userSchema)  // Specify schema of the parquet files
-.csv("/path/to/directory")// Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory")// Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark
 .readStream()
 .option("sep", ";")
 .schema(userSchema)  // Specify schema of the parquet files
-.csv("/path/to/directory");// Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory");// Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -513,7 +513,7 @@ csvDF = spark \
 .readStream() \
 .option("sep", ";") \
 .schema(userSchema) \
-.csv("/path/to/directory")# Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory")# Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -522,10 +522,10 @@ csvDF = spark \
 These examples generate streaming DataFrames that are untyped, meaning that 
the schema of the DataFrame is not checked at compile time, only checked at 
runtime when the query is submitted. Some operations like `map`, `flatMap`, 
etc. need the type to be known at compile time. To do those, you can convert 
these untyped streaming DataFrames to typed streaming Datasets using the same 
methods as static DataFrame. See the SQL Programming Guide for more details. 
Additionally, more details on the supported streaming sources are discussed 
later in the document.
 
 ## Operations on streaming DataFrames/Datasets
-You can apply all kinds of operations on streaming DataFrames/Datasets - 
ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to 
typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming 
guide for more details. Let’s take a look at a few example operations that 
you can use.
+You can apply all kinds of operations on streaming DataFrames/Datasets - 
ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to 
typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming 
guide](sql-programming-guide.html) for more details. Let’s take a look at a 
few example operations that you can use.
 
 ### Basic Operations - Selection, Projection, Aggregation
-Most of the common operations on DataFrame/Dataset are supported for 
streaming. The few operations that are not supported are discussed later in 
this section.
+Most of the common operations on DataFrame/Dataset are supported for 
streaming. The few operations that are not supported are [discussed 
later](#unsupported-operations) in this section.
 
 
 
@@ -618,7 +618,7 @@ df.groupBy("type").count()
 
 
 ### Window Operations on Event Time
-Aggregations over a sliding event-time window are straightforward with 
Structured Streaming. The key idea to understand about window-based 
aggregations are very similar to grouped aggregations. In a grouped 
aggregation, aggregate values (e.g. counts) are maintained for each unique 
value in the user-specified grouping column. In case of, window-based 
aggregations, aggregate values are maintained for each window the event-time of 
a row falls into. Let's understand this 

spark git commit: [SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master dedbceec1 -> 2c3d96134


[SPARK-16256][DOCS] Minor fixes on the Structured Streaming Programming Guide

Author: Tathagata Das 

Closes #13978 from tdas/SPARK-16256-1.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c3d9613
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c3d9613
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c3d9613

Branch: refs/heads/master
Commit: 2c3d96134dcc0428983eea087db7e91072215aea
Parents: dedbcee
Author: Tathagata Das 
Authored: Wed Jun 29 23:38:19 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jun 29 23:38:19 2016 -0700

--
 docs/structured-streaming-programming-guide.md | 44 +++--
 1 file changed, 23 insertions(+), 21 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c3d9613/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 9ed06be..5932566 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -459,7 +459,7 @@ val csvDF = spark
 .readStream
 .option("sep", ";")
 .schema(userSchema)  // Specify schema of the parquet files
-.csv("/path/to/directory")// Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory")// Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -486,7 +486,7 @@ Dataset[Row] csvDF = spark
 .readStream()
 .option("sep", ";")
 .schema(userSchema)  // Specify schema of the parquet files
-.csv("/path/to/directory");// Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory");// Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -513,7 +513,7 @@ csvDF = spark \
 .readStream() \
 .option("sep", ";") \
 .schema(userSchema) \
-.csv("/path/to/directory")# Equivalent to 
format("cv").load("/path/to/directory")
+.csv("/path/to/directory")# Equivalent to 
format("csv").load("/path/to/directory")
 {% endhighlight %}
 
 
@@ -522,10 +522,10 @@ csvDF = spark \
 These examples generate streaming DataFrames that are untyped, meaning that 
the schema of the DataFrame is not checked at compile time, only checked at 
runtime when the query is submitted. Some operations like `map`, `flatMap`, 
etc. need the type to be known at compile time. To do those, you can convert 
these untyped streaming DataFrames to typed streaming Datasets using the same 
methods as static DataFrame. See the SQL Programming Guide for more details. 
Additionally, more details on the supported streaming sources are discussed 
later in the document.
 
 ## Operations on streaming DataFrames/Datasets
-You can apply all kinds of operations on streaming DataFrames/Datasets - 
ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to 
typed RDD-like operations (e.g. map, filter, flatMap). See the SQL programming 
guide for more details. Let’s take a look at a few example operations that 
you can use.
+You can apply all kinds of operations on streaming DataFrames/Datasets - 
ranging from untyped, SQL-like operations (e.g. select, where, groupBy), to 
typed RDD-like operations (e.g. map, filter, flatMap). See the [SQL programming 
guide](sql-programming-guide.html) for more details. Let’s take a look at a 
few example operations that you can use.
 
 ### Basic Operations - Selection, Projection, Aggregation
-Most of the common operations on DataFrame/Dataset are supported for 
streaming. The few operations that are not supported are discussed later in 
this section.
+Most of the common operations on DataFrame/Dataset are supported for 
streaming. The few operations that are not supported are [discussed 
later](#unsupported-operations) in this section.
 
 
 
@@ -618,7 +618,7 @@ df.groupBy("type").count()
 
 
 ### Window Operations on Event Time
-Aggregations over a sliding event-time window are straightforward with 
Structured Streaming. The key idea to understand about window-based 
aggregations are very similar to grouped aggregations. In a grouped 
aggregation, aggregate values (e.g. counts) are maintained for each unique 
value in the user-specified grouping column. In case of, window-based 
aggregations, aggregate values are maintained for each window the event-time of 
a row falls into. Let's understand this with an illustration. 
+Aggregations over a sliding event-time window are straightforward with 
Structured Streaming. The key idea to 

[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 a54852350 -> 3134f116a


http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 000..aba45f5
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
+final ConsumerStrategy sub1 =
+  Subscribe.apply(sTopics, sKafkaParams, sOffsets);
+final ConsumerStrategy sub2 =
+  Subscribe.apply(sTopics, sKafkaParams);
+final ConsumerStrategy sub3 =
+  Subscribe.create(topics, kafkaParams, offsets);
+final ConsumerStrategy sub4 =
+  Subscribe.create(topics, kafkaParams);
+
+Assert.assertEquals(
+  sub1.executorKafkaParams().get("bootstrap.servers"),
+  sub3.executorKafkaParams().get("bootstrap.servers"));
+
+final ConsumerStrategy asn0 =
+  Assign.apply(parts, kafkaParams, offsets);
+final ConsumerStrategy asn1 =
+  Assign.apply(sParts, sKafkaParams, sOffsets);
+final ConsumerStrategy asn2 =
+  Assign.apply(sParts, sKafkaParams);
+final ConsumerStrategy asn3 =
+  Assign.create(parts, kafkaParams, offsets);
+final ConsumerStrategy asn4 =
+  Assign.create(parts, kafkaParams);
+
+Assert.assertEquals(
+  asn1.executorKafkaParams().get("bootstrap.servers"),
+  asn3.executorKafkaParams().get("bootstrap.servers"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 000..e57ede7
--- /dev/null
+++ 

[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-30 Thread tdas
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer 
API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger 

Closes #11863 from koeninger/kafka-0.9.

(cherry picked from commit dedbceec1ef33ccd88101016de969a1ef3e3e142)
Signed-off-by: Tathagata Das 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3134f116
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3134f116
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3134f116

Branch: refs/heads/branch-2.0
Commit: 3134f116a3565c3a299fa2e7094acd7304d64280
Parents: a548523
Author: cody koeninger 
Authored: Wed Jun 29 23:21:03 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jun 29 23:21:15 2016 -0700

--
 external/kafka-0-10-assembly/pom.xml| 176 ++
 external/kafka-0-10/pom.xml |  98 +++
 .../kafka010/CachedKafkaConsumer.scala  | 189 ++
 .../streaming/kafka010/ConsumerStrategy.scala   | 314 ++
 .../kafka010/DirectKafkaInputDStream.scala  | 318 ++
 .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++
 .../streaming/kafka010/KafkaRDDPartition.scala  |  45 ++
 .../streaming/kafka010/KafkaTestUtils.scala | 277 +
 .../spark/streaming/kafka010/KafkaUtils.scala   | 175 ++
 .../streaming/kafka010/LocationStrategy.scala   |  77 +++
 .../spark/streaming/kafka010/OffsetRange.scala  | 153 +
 .../spark/streaming/kafka010/package-info.java  |  21 +
 .../spark/streaming/kafka010/package.scala  |  23 +
 .../kafka010/JavaConsumerStrategySuite.java |  84 +++
 .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++
 .../streaming/kafka010/JavaKafkaRDDSuite.java   | 122 
 .../kafka010/JavaLocationStrategySuite.java |  58 ++
 .../src/test/resources/log4j.properties |  28 +
 .../kafka010/DirectKafkaStreamSuite.scala   | 612 +++
 .../streaming/kafka010/KafkaRDDSuite.scala  | 169 +
 pom.xml |   2 +
 project/SparkBuild.scala|  12 +-
 22 files changed, 3359 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3134f116/external/kafka-0-10-assembly/pom.xml
--
diff --git a/external/kafka-0-10-assembly/pom.xml 
b/external/kafka-0-10-assembly/pom.xml
new file mode 100644
index 000..f2468d1
--- /dev/null
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -0,0 +1,176 @@
+
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.spark
+spark-parent_2.11
+2.0.0-SNAPSHOT
+../../pom.xml
+  
+
+  org.apache.spark
+  spark-streaming-kafka-0-10-assembly_2.11
+  jar
+  Spark Integration for Kafka 0.10 Assembly
+  http://spark.apache.org/
+
+  
+streaming-kafka-0-10-assembly
+  
+
+  
+
+  org.apache.spark
+  
spark-streaming-kafka-0-10_${scala.binary.version}
+  ${project.version}
+
+
+  org.apache.spark
+  spark-streaming_${scala.binary.version}
+  ${project.version}
+  provided
+
+
+
+  commons-codec
+  commons-codec
+  provided
+
+
+  commons-lang
+  commons-lang
+  provided
+
+
+  com.google.protobuf
+  protobuf-java
+  provided
+
+
+  net.jpountz.lz4
+  lz4
+  provided
+
+
+  org.apache.hadoop
+  hadoop-client
+  provided
+
+
+  org.apache.avro
+  avro-mapred
+  ${avro.mapred.classifier}
+  provided
+
+
+  org.apache.curator
+  curator-recipes
+  provided
+
+
+  org.apache.zookeeper
+  zookeeper
+  provided
+
+
+  log4j
+  log4j
+  provided
+
+
+  net.java.dev.jets3t
+  jets3t
+  provided
+
+
+  org.scala-lang
+  scala-library
+  provided
+
+
+  org.slf4j
+  slf4j-api
+  provided
+
+
+  org.slf4j
+  slf4j-log4j12
+  provided
+
+
+  org.xerial.snappy
+  snappy-java
+  provided
+
+  
+
+  
+  
target/scala-${scala.binary.version}/classes
+  
target/scala-${scala.binary.version}/test-classes
+  
+
+  org.apache.maven.plugins
+  maven-shade-plugin
+  
+false
+
+  
+*:*
+  
+
+
+  
+*:*
+
+   

[1/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-30 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master bde1d6a61 -> dedbceec1


http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
new file mode 100644
index 000..aba45f5
--- /dev/null
+++ 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaConsumerStrategySuite.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.streaming.kafka010;
+
+import java.io.Serializable;
+import java.util.*;
+
+import scala.collection.JavaConverters;
+
+import org.apache.kafka.common.TopicPartition;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+public class JavaConsumerStrategySuite implements Serializable {
+
+  @Test
+  public void testConsumerStrategyConstructors() {
+final String topic1 = "topic1";
+final Collection topics = Arrays.asList(topic1);
+final scala.collection.Iterable sTopics =
+  JavaConverters.collectionAsScalaIterableConverter(topics).asScala();
+final TopicPartition tp1 = new TopicPartition(topic1, 0);
+final TopicPartition tp2 = new TopicPartition(topic1, 1);
+final Collection parts = Arrays.asList(tp1, tp2);
+final scala.collection.Iterable sParts =
+  JavaConverters.collectionAsScalaIterableConverter(parts).asScala();
+final Map kafkaParams = new HashMap();
+kafkaParams.put("bootstrap.servers", "not used");
+final scala.collection.Map sKafkaParams =
+  JavaConverters.mapAsScalaMapConverter(kafkaParams).asScala();
+final Map offsets = new HashMap<>();
+offsets.put(tp1, 23L);
+final scala.collection.Map sOffsets =
+  JavaConverters.mapAsScalaMapConverter(offsets).asScala();
+
+// make sure constructors can be called from java
+final ConsumerStrategy sub0 =
+  Subscribe.apply(topics, kafkaParams, offsets);
+final ConsumerStrategy sub1 =
+  Subscribe.apply(sTopics, sKafkaParams, sOffsets);
+final ConsumerStrategy sub2 =
+  Subscribe.apply(sTopics, sKafkaParams);
+final ConsumerStrategy sub3 =
+  Subscribe.create(topics, kafkaParams, offsets);
+final ConsumerStrategy sub4 =
+  Subscribe.create(topics, kafkaParams);
+
+Assert.assertEquals(
+  sub1.executorKafkaParams().get("bootstrap.servers"),
+  sub3.executorKafkaParams().get("bootstrap.servers"));
+
+final ConsumerStrategy asn0 =
+  Assign.apply(parts, kafkaParams, offsets);
+final ConsumerStrategy asn1 =
+  Assign.apply(sParts, sKafkaParams, sOffsets);
+final ConsumerStrategy asn2 =
+  Assign.apply(sParts, sKafkaParams);
+final ConsumerStrategy asn3 =
+  Assign.create(parts, kafkaParams, offsets);
+final ConsumerStrategy asn4 =
+  Assign.create(parts, kafkaParams);
+
+Assert.assertEquals(
+  asn1.executorKafkaParams().get("bootstrap.servers"),
+  asn3.executorKafkaParams().get("bootstrap.servers"));
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
--
diff --git 
a/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
 
b/external/kafka-0-10/src/test/java/org/apache/spark/streaming/kafka010/JavaDirectKafkaStreamSuite.java
new file mode 100644
index 000..e57ede7
--- /dev/null
+++ 

[2/2] spark git commit: [SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer API

2016-06-30 Thread tdas
[SPARK-12177][STREAMING][KAFKA] Update KafkaDStreams to new Kafka 0.10 Consumer 
API

## What changes were proposed in this pull request?

New Kafka consumer api for the released 0.10 version of Kafka

## How was this patch tested?

Unit tests, manual tests

Author: cody koeninger 

Closes #11863 from koeninger/kafka-0.9.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/dedbceec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/dedbceec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/dedbceec

Branch: refs/heads/master
Commit: dedbceec1ef33ccd88101016de969a1ef3e3e142
Parents: bde1d6a
Author: cody koeninger 
Authored: Wed Jun 29 23:21:03 2016 -0700
Committer: Tathagata Das 
Committed: Wed Jun 29 23:21:03 2016 -0700

--
 external/kafka-0-10-assembly/pom.xml| 176 ++
 external/kafka-0-10/pom.xml |  98 +++
 .../kafka010/CachedKafkaConsumer.scala  | 189 ++
 .../streaming/kafka010/ConsumerStrategy.scala   | 314 ++
 .../kafka010/DirectKafkaInputDStream.scala  | 318 ++
 .../spark/streaming/kafka010/KafkaRDD.scala | 232 +++
 .../streaming/kafka010/KafkaRDDPartition.scala  |  45 ++
 .../streaming/kafka010/KafkaTestUtils.scala | 277 +
 .../spark/streaming/kafka010/KafkaUtils.scala   | 175 ++
 .../streaming/kafka010/LocationStrategy.scala   |  77 +++
 .../spark/streaming/kafka010/OffsetRange.scala  | 153 +
 .../spark/streaming/kafka010/package-info.java  |  21 +
 .../spark/streaming/kafka010/package.scala  |  23 +
 .../kafka010/JavaConsumerStrategySuite.java |  84 +++
 .../kafka010/JavaDirectKafkaStreamSuite.java| 180 ++
 .../streaming/kafka010/JavaKafkaRDDSuite.java   | 122 
 .../kafka010/JavaLocationStrategySuite.java |  58 ++
 .../src/test/resources/log4j.properties |  28 +
 .../kafka010/DirectKafkaStreamSuite.scala   | 612 +++
 .../streaming/kafka010/KafkaRDDSuite.scala  | 169 +
 pom.xml |   2 +
 project/SparkBuild.scala|  12 +-
 22 files changed, 3359 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/dedbceec/external/kafka-0-10-assembly/pom.xml
--
diff --git a/external/kafka-0-10-assembly/pom.xml 
b/external/kafka-0-10-assembly/pom.xml
new file mode 100644
index 000..f2468d1
--- /dev/null
+++ b/external/kafka-0-10-assembly/pom.xml
@@ -0,0 +1,176 @@
+
+
+
+http://maven.apache.org/POM/4.0.0; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd;>
+  4.0.0
+  
+org.apache.spark
+spark-parent_2.11
+2.0.0-SNAPSHOT
+../../pom.xml
+  
+
+  org.apache.spark
+  spark-streaming-kafka-0-10-assembly_2.11
+  jar
+  Spark Integration for Kafka 0.10 Assembly
+  http://spark.apache.org/
+
+  
+streaming-kafka-0-10-assembly
+  
+
+  
+
+  org.apache.spark
+  
spark-streaming-kafka-0-10_${scala.binary.version}
+  ${project.version}
+
+
+  org.apache.spark
+  spark-streaming_${scala.binary.version}
+  ${project.version}
+  provided
+
+
+
+  commons-codec
+  commons-codec
+  provided
+
+
+  commons-lang
+  commons-lang
+  provided
+
+
+  com.google.protobuf
+  protobuf-java
+  provided
+
+
+  net.jpountz.lz4
+  lz4
+  provided
+
+
+  org.apache.hadoop
+  hadoop-client
+  provided
+
+
+  org.apache.avro
+  avro-mapred
+  ${avro.mapred.classifier}
+  provided
+
+
+  org.apache.curator
+  curator-recipes
+  provided
+
+
+  org.apache.zookeeper
+  zookeeper
+  provided
+
+
+  log4j
+  log4j
+  provided
+
+
+  net.java.dev.jets3t
+  jets3t
+  provided
+
+
+  org.scala-lang
+  scala-library
+  provided
+
+
+  org.slf4j
+  slf4j-api
+  provided
+
+
+  org.slf4j
+  slf4j-log4j12
+  provided
+
+
+  org.xerial.snappy
+  snappy-java
+  provided
+
+  
+
+  
+  
target/scala-${scala.binary.version}/classes
+  
target/scala-${scala.binary.version}/test-classes
+  
+
+  org.apache.maven.plugins
+  maven-shade-plugin
+  
+false
+
+  
+*:*
+  
+
+
+  
+*:*
+
+  META-INF/*.SF
+  META-INF/*.DSA
+  META-INF/*.RSA
+
+  
+
+  
+