spark git commit: [SPARK-19585][DOC][SQL] Fix the cacheTable and uncacheTable api call in the doc

2017-02-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 7fe3543fd -> c8113b0ee


[SPARK-19585][DOC][SQL] Fix the cacheTable and uncacheTable api call in the doc

## What changes were proposed in this pull request?

https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
In the doc, the call spark.cacheTable(“tableName”) and 
spark.uncacheTable(“tableName”) actually needs to be 
spark.catalog.cacheTable and spark.catalog.uncacheTable

## How was this patch tested?
Built the docs and verified the change shows up fine.

Author: Sunitha Kambhampati 

Closes #16919 from skambha/docChange.

(cherry picked from commit 9b5e460a9168ab78607034434ca45ab6cb51e5a6)
Signed-off-by: Xiao Li 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c8113b0e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c8113b0e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c8113b0e

Branch: refs/heads/branch-2.1
Commit: c8113b0ee0555efe72827a91246af2737d1d4993
Parents: 7fe3543
Author: Sunitha Kambhampati 
Authored: Mon Feb 13 22:49:29 2017 -0800
Committer: Xiao Li 
Committed: Mon Feb 13 22:49:40 2017 -0800

--
 docs/sql-programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c8113b0e/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 55ed913..2173aba 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1217,9 +1217,9 @@ turning on some experimental options.
 
 ## Caching Data In Memory
 
-Spark SQL can cache tables using an in-memory columnar format by calling 
`spark.cacheTable("tableName")` or `dataFrame.cache()`.
+Spark SQL can cache tables using an in-memory columnar format by calling 
`spark.catalog.cacheTable("tableName")` or `dataFrame.cache()`.
 Then Spark SQL will scan only required columns and will automatically tune 
compression to minimize
-memory usage and GC pressure. You can call `spark.uncacheTable("tableName")` 
to remove the table from memory.
+memory usage and GC pressure. You can call 
`spark.catalog.uncacheTable("tableName")` to remove the table from memory.
 
 Configuration of in-memory caching can be done using the `setConf` method on 
`SparkSession` or by running
 `SET key=value` commands using SQL.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19585][DOC][SQL] Fix the cacheTable and uncacheTable api call in the doc

2017-02-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 1ab97310e -> 9b5e460a9


[SPARK-19585][DOC][SQL] Fix the cacheTable and uncacheTable api call in the doc

## What changes were proposed in this pull request?

https://spark.apache.org/docs/latest/sql-programming-guide.html#caching-data-in-memory
In the doc, the call spark.cacheTable(“tableName”) and 
spark.uncacheTable(“tableName”) actually needs to be 
spark.catalog.cacheTable and spark.catalog.uncacheTable

## How was this patch tested?
Built the docs and verified the change shows up fine.

Author: Sunitha Kambhampati 

Closes #16919 from skambha/docChange.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9b5e460a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9b5e460a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9b5e460a

Branch: refs/heads/master
Commit: 9b5e460a9168ab78607034434ca45ab6cb51e5a6
Parents: 1ab9731
Author: Sunitha Kambhampati 
Authored: Mon Feb 13 22:49:29 2017 -0800
Committer: Xiao Li 
Committed: Mon Feb 13 22:49:29 2017 -0800

--
 docs/sql-programming-guide.md | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9b5e460a/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 9cf480c..235f5ec 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1272,9 +1272,9 @@ turning on some experimental options.
 
 ## Caching Data In Memory
 
-Spark SQL can cache tables using an in-memory columnar format by calling 
`spark.cacheTable("tableName")` or `dataFrame.cache()`.
+Spark SQL can cache tables using an in-memory columnar format by calling 
`spark.catalog.cacheTable("tableName")` or `dataFrame.cache()`.
 Then Spark SQL will scan only required columns and will automatically tune 
compression to minimize
-memory usage and GC pressure. You can call `spark.uncacheTable("tableName")` 
to remove the table from memory.
+memory usage and GC pressure. You can call 
`spark.catalog.uncacheTable("tableName")` to remove the table from memory.
 
 Configuration of in-memory caching can be done using the `setConf` method on 
`SparkSession` or by running
 `SET key=value` commands using SQL.


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19539][SQL] Block duplicate temp table during creation

2017-02-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 6e45b547c -> 1ab97310e


[SPARK-19539][SQL] Block duplicate temp table during creation

## What changes were proposed in this pull request?
Current `CREATE TEMPORARY TABLE ... ` is deprecated and recommend users to use 
`CREATE TEMPORARY VIEW ...` And it does not support `IF NOT EXISTS `clause. 
However, if there is an existing temporary view defined, it is possible to 
unintentionally replace this existing view by issuing `CREATE TEMPORARY TABLE 
...`  with the same table/view name.

This PR is to disallow `CREATE TEMPORARY TABLE ...` with an existing view name.
Under the cover, `CREATE TEMPORARY TABLE ...` will be changed to create 
temporary view, however, passing in a flag `replace=false`, instead of 
currently `true`. So when creating temporary view under the cover, if there is 
existing view with the same name, the operation will be blocked.

## How was this patch tested?
New unit test case is added and updated some existing test cases to adapt the 
new behavior

Author: Xin Wu 

Closes #16878 from xwu0226/block_duplicate_temp_table.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1ab97310
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1ab97310
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1ab97310

Branch: refs/heads/master
Commit: 1ab97310e83ee138a1b210c0dfa89a341f1d530a
Parents: 6e45b54
Author: Xin Wu 
Authored: Mon Feb 13 19:45:58 2017 -0800
Committer: Xiao Li 
Committed: Mon Feb 13 19:45:58 2017 -0800

--
 .../spark/sql/execution/SparkSqlParser.scala|   4 +-
 .../org/apache/spark/sql/SQLQuerySuite.scala|  18 ++--
 .../spark/sql/execution/command/DDLSuite.scala  |  58 +-
 .../RowDataSourceStrategySuite.scala|   4 +-
 .../execution/datasources/csv/CSVSuite.scala| 105 ++-
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   |  64 +--
 .../apache/spark/sql/sources/DDLTestSuite.scala |  14 +--
 .../apache/spark/sql/sources/InsertSuite.scala  |   4 +-
 .../sql/hive/execution/HiveCommandSuite.scala   |   8 +-
 .../sql/hive/execution/SQLQuerySuite.scala  |  18 ++--
 10 files changed, 160 insertions(+), 137 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/1ab97310/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index ca76a10..d508002 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -425,7 +425,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder {
 
 logWarning(s"CREATE TEMPORARY TABLE ... USING ... is deprecated, 
please use " +
   "CREATE TEMPORARY VIEW ... USING ... instead")
-CreateTempViewUsing(table, schema, replace = true, global = false, 
provider, options)
+// Unlike CREATE TEMPORARY VIEW USING, CREATE TEMPORARY TABLE USING 
does not support
+// IF NOT EXISTS. Users are not allowed to replace the existing temp 
table.
+CreateTempViewUsing(table, schema, replace = false, global = false, 
provider, options)
   } else {
 CreateTable(tableDesc, mode, None)
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1ab97310/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
--
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 9c95b12..40d0ce0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -1571,7 +1571,7 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
 }
   }
 
-  test("specifying database name for a temporary table is not allowed") {
+  test("specifying database name for a temporary view is not allowed") {
 withTempPath { dir =>
   val path = dir.toURI.toString
   val df =
@@ -1585,23 +1585,23 @@ class SQLQuerySuite extends QueryTest with 
SharedSQLContext {
   intercept[AnalysisException] {
 spark.sql(
   s"""
-  |CREATE TEMPORARY TABLE db.t
-  |USING parquet
-  |OPTIONS (
-  |  path '$path'
-  |)
-""".stripMargin)
+|CREATE TEMPORARY VIEW db.t
+|USING parquet
+|OPTIONS (
+|  path '$path'
+

spark git commit: [SPARK-19115][SQL] Supporting Create Table Like Location

2017-02-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master e02ac303c -> 6e45b547c


[SPARK-19115][SQL] Supporting Create Table Like Location

What changes were proposed in this pull request?

Support CREATE [EXTERNAL] TABLE LIKE LOCATION... syntax for Hive serde and 
datasource tables.
In this PR,we follow SparkSQL design rules :

supporting create table like view or physical table or temporary view with 
location.
creating a table with location,this table will be an external table other 
than managed table.

How was this patch tested?

Add new test cases and update existing test cases

Author: ouyangxiaochen 

Closes #16868 from ouyangxiaochen/spark19115.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6e45b547
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6e45b547
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6e45b547

Branch: refs/heads/master
Commit: 6e45b547ceadbbe8394bf149945b7942df82660a
Parents: e02ac30
Author: ouyangxiaochen 
Authored: Mon Feb 13 19:41:44 2017 -0800
Committer: Xiao Li 
Committed: Mon Feb 13 19:41:44 2017 -0800

--
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   2 +-
 .../spark/sql/execution/SparkSqlParser.scala|   5 +-
 .../spark/sql/execution/command/tables.scala|  14 +-
 .../spark/sql/hive/HiveDDLCommandSuite.scala|  32 -
 .../spark/sql/hive/execution/HiveDDLSuite.scala | 143 +++
 5 files changed, 159 insertions(+), 37 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6e45b547/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
--
diff --git 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index c95c1f5..3123998 100644
--- 
a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ 
b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -82,7 +82,7 @@ statement
 (TBLPROPERTIES tablePropertyList)?
 (AS? query)?   
#createHiveTable
 | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
-LIKE source=tableIdentifier
#createTableLike
+LIKE source=tableIdentifier locationSpec?  
#createTableLike
 | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
 (identifier | FOR COLUMNS identifierSeq)?  #analyze
 | ALTER (TABLE | VIEW) from=tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/6e45b547/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 41768d4..ca76a10 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -1141,13 +1141,14 @@ class SparkSqlAstBuilder(conf: SQLConf) extends 
AstBuilder {
* For example:
* {{{
*   CREATE TABLE [IF NOT EXISTS] [db_name.]table_name
-   *   LIKE [other_db_name.]existing_table_name
+   *   LIKE [other_db_name.]existing_table_name [locationSpec]
* }}}
*/
   override def visitCreateTableLike(ctx: CreateTableLikeContext): LogicalPlan 
= withOrigin(ctx) {
 val targetTable = visitTableIdentifier(ctx.target)
 val sourceTable = visitTableIdentifier(ctx.source)
-CreateTableLikeCommand(targetTable, sourceTable, ctx.EXISTS != null)
+val location = Option(ctx.locationSpec).map(visitLocationSpec)
+CreateTableLikeCommand(targetTable, sourceTable, location, ctx.EXISTS != 
null)
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/6e45b547/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index bc4b5b6..d646a21 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
 /**
- * A command to create a MANAGED table with the same definition of the given 

spark git commit: [SPARK-19429][PYTHON][SQL] Support slice arguments in Column.__getitem__

2017-02-13 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 0169360ef -> e02ac303c


[SPARK-19429][PYTHON][SQL] Support slice arguments in Column.__getitem__

## What changes were proposed in this pull request?

- Add support for `slice` arguments in `Column.__getitem__`.
- Remove obsolete `__getslice__` bindings.

## How was this patch tested?

Existing unit tests, additional tests covering `[]` with `slice`.

Author: zero323 

Closes #16771 from zero323/SPARK-19429.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e02ac303
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e02ac303
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e02ac303

Branch: refs/heads/master
Commit: e02ac303c6356cdf7fffec7361311d828a723afe
Parents: 0169360
Author: zero323 
Authored: Mon Feb 13 15:23:56 2017 -0800
Committer: Holden Karau 
Committed: Mon Feb 13 15:23:56 2017 -0800

--
 python/pyspark/sql/column.py | 11 ---
 python/pyspark/sql/tests.py  |  8 
 2 files changed, 16 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/e02ac303/python/pyspark/sql/column.py
--
diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py
index ec059d6..73c8672 100644
--- a/python/pyspark/sql/column.py
+++ b/python/pyspark/sql/column.py
@@ -180,7 +180,6 @@ class Column(object):
 
 # container operators
 __contains__ = _bin_op("contains")
-__getitem__ = _bin_op("apply")
 
 # bitwise operators
 bitwiseOR = _bin_op("bitwiseOR")
@@ -236,6 +235,14 @@ class Column(object):
 raise AttributeError(item)
 return self.getField(item)
 
+def __getitem__(self, k):
+if isinstance(k, slice):
+if k.step is not None:
+raise ValueError("slice with step is not supported.")
+return self.substr(k.start, k.stop)
+else:
+return _bin_op("apply")(self, k)
+
 def __iter__(self):
 raise TypeError("Column is not iterable")
 
@@ -267,8 +274,6 @@ class Column(object):
 raise TypeError("Unexpected type: %s" % type(startPos))
 return Column(jc)
 
-__getslice__ = substr
-
 @ignore_unicode_prefix
 @since(1.5)
 def isin(self, *cols):

http://git-wip-us.apache.org/repos/asf/spark/blob/e02ac303/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index ab9d3f6..d9d0333 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -874,6 +874,14 @@ class SQLTests(ReusedPySparkTestCase):
 self.assertTrue(all(isinstance(c, Column) for c in css))
 self.assertTrue(isinstance(ci.cast(LongType()), Column))
 
+def test_column_getitem(self):
+from pyspark.sql.functions import col
+
+self.assertIsInstance(col("foo")[1:3], Column)
+self.assertIsInstance(col("foo")[0], Column)
+self.assertIsInstance(col("foo")["bar"], Column)
+self.assertRaises(ValueError, lambda: col("foo")[0:10:2])
+
 def test_column_select(self):
 df = self.df
 self.assertEqual(self.testData, df.select("*").collect())


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19520][STREAMING] Do not encrypt data written to the WAL.

2017-02-13 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 9af8f743b -> 0169360ef


[SPARK-19520][STREAMING] Do not encrypt data written to the WAL.

Spark's I/O encryption uses an ephemeral key for each driver instance.
So driver B cannot decrypt data written by driver A since it doesn't
have the correct key.

The write ahead log is used for recovery, thus needs to be readable by
a different driver. So it cannot be encrypted by Spark's I/O encryption
code.

The BlockManager APIs used by the WAL code to write the data automatically
encrypt data, so changes are needed so that callers can to opt out of
encryption.

Aside from that, the "putBytes" API in the BlockManager does not do
encryption, so a separate situation arised where the WAL would write
unencrypted data to the BM and, when those blocks were read, decryption
would fail. So the WAL code needs to ask the BM to encrypt that data
when encryption is enabled; this code is not optimal since it results
in a (temporary) second copy of the data block in memory, but should be
OK for now until a more performant solution is added. The non-encryption
case should not be affected.

Tested with new unit tests, and by running streaming apps that do
recovery using the WAL data with I/O encryption turned on.

Author: Marcelo Vanzin 

Closes #16862 from vanzin/SPARK-19520.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0169360e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0169360e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0169360e

Branch: refs/heads/master
Commit: 0169360ef58891ca10a8d64d1c8637c7b873cbdd
Parents: 9af8f74
Author: Marcelo Vanzin 
Authored: Mon Feb 13 14:19:41 2017 -0800
Committer: Marcelo Vanzin 
Committed: Mon Feb 13 14:19:41 2017 -0800

--
 .../org/apache/spark/SecurityManager.scala  |  2 +-
 .../spark/serializer/SerializerManager.scala| 20 ++---
 .../org/apache/spark/storage/BlockManager.scala | 35 +++-
 docs/streaming-programming-guide.md |  3 ++
 .../rdd/WriteAheadLogBackedBlockRDD.scala   |  9 ++--
 .../receiver/ReceivedBlockHandler.scala | 11 +++--
 .../streaming/ReceivedBlockHandlerSuite.scala   | 27 +---
 .../rdd/WriteAheadLogBackedBlockRDDSuite.scala  | 43 
 8 files changed, 120 insertions(+), 30 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0169360e/core/src/main/scala/org/apache/spark/SecurityManager.scala
--
diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala 
b/core/src/main/scala/org/apache/spark/SecurityManager.scala
index cde7682..2480e56 100644
--- a/core/src/main/scala/org/apache/spark/SecurityManager.scala
+++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala
@@ -184,7 +184,7 @@ import org.apache.spark.util.Utils
 
 private[spark] class SecurityManager(
 sparkConf: SparkConf,
-ioEncryptionKey: Option[Array[Byte]] = None)
+val ioEncryptionKey: Option[Array[Byte]] = None)
   extends Logging with SecretKeyHolder {
 
   import SecurityManager._

http://git-wip-us.apache.org/repos/asf/spark/blob/0169360e/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
--
diff --git 
a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala 
b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
index 748f0a3..96b288b 100644
--- a/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala
@@ -171,20 +171,26 @@ private[spark] class SerializerManager(
   }
 
   /** Serializes into a chunked byte buffer. */
-  def dataSerialize[T: ClassTag](blockId: BlockId, values: Iterator[T]): 
ChunkedByteBuffer = {
-dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]])
+  def dataSerialize[T: ClassTag](
+  blockId: BlockId,
+  values: Iterator[T],
+  allowEncryption: Boolean = true): ChunkedByteBuffer = {
+dataSerializeWithExplicitClassTag(blockId, values, implicitly[ClassTag[T]],
+  allowEncryption = allowEncryption)
   }
 
   /** Serializes into a chunked byte buffer. */
   def dataSerializeWithExplicitClassTag(
   blockId: BlockId,
   values: Iterator[_],
-  classTag: ClassTag[_]): ChunkedByteBuffer = {
+  classTag: ClassTag[_],
+  allowEncryption: Boolean = true): ChunkedByteBuffer = {
 val bbos = new ChunkedByteBufferOutputStream(1024 * 1024 * 4, 
ByteBuffer.allocate)
 val byteStream = new BufferedOutputStream(bbos)
 val autoPick = !blockId.isInstanceOf[StreamBlockId]
 val 

spark git commit: [SPARK-19435][SQL] Type coercion between ArrayTypes

2017-02-13 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 905fdf0c2 -> 9af8f743b


[SPARK-19435][SQL] Type coercion between ArrayTypes

## What changes were proposed in this pull request?

This PR proposes to support type coercion between `ArrayType`s where the 
element types are compatible.

**Before**

```
Seq(Array(1)).toDF("a").selectExpr("greatest(a, array(1D))")
org.apache.spark.sql.AnalysisException: cannot resolve 'greatest(`a`, 
array(1.0D))' due to data type mismatch: The expressions should all have the 
same type, got GREATEST(array, array).; line 1 pos 0;

Seq(Array(1)).toDF("a").selectExpr("least(a, array(1D))")
org.apache.spark.sql.AnalysisException: cannot resolve 'least(`a`, 
array(1.0D))' due to data type mismatch: The expressions should all have the 
same type, got LEAST(array, array).; line 1 pos 0;

sql("SELECT * FROM values (array(0)), (array(1D)) as data(a)")
org.apache.spark.sql.AnalysisException: incompatible types found in column a 
for inline table; line 1 pos 14

Seq(Array(1)).toDF("a").union(Seq(Array(1D)).toDF("b"))
org.apache.spark.sql.AnalysisException: Union can only be performed on tables 
with the compatible column types. ArrayType(DoubleType,false) <> 
ArrayType(IntegerType,false) at the first column of the second table;;

sql("SELECT IF(1=1, array(1), array(1D))")
org.apache.spark.sql.AnalysisException: cannot resolve '(IF((1 = 1), array(1), 
array(1.0D)))' due to data type mismatch: differing types in '(IF((1 = 1), 
array(1), array(1.0D)))' (array and array).; line 1 pos 7;
```

**After**

```scala
Seq(Array(1)).toDF("a").selectExpr("greatest(a, array(1D))")
res5: org.apache.spark.sql.DataFrame = [greatest(a, array(1.0)): array]

Seq(Array(1)).toDF("a").selectExpr("least(a, array(1D))")
res6: org.apache.spark.sql.DataFrame = [least(a, array(1.0)): array]

sql("SELECT * FROM values (array(0)), (array(1D)) as data(a)")
res8: org.apache.spark.sql.DataFrame = [a: array]

Seq(Array(1)).toDF("a").union(Seq(Array(1D)).toDF("b"))
res10: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [a: 
array]

sql("SELECT IF(1=1, array(1), array(1D))")
res15: org.apache.spark.sql.DataFrame = [(IF((1 = 1), array(1), array(1.0))): 
array]
```

## How was this patch tested?

Unit tests in `TypeCoercion` and Jenkins tests and

building with scala 2.10

```scala
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```

Author: hyukjinkwon 

Closes #16777 from HyukjinKwon/SPARK-19435.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9af8f743
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9af8f743
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9af8f743

Branch: refs/heads/master
Commit: 9af8f743b1f9fdf8813481464c3837331ad9
Parents: 905fdf0
Author: hyukjinkwon 
Authored: Mon Feb 13 13:10:57 2017 -0800
Committer: Xiao Li 
Committed: Mon Feb 13 13:10:57 2017 -0800

--
 .../sql/catalyst/analysis/TypeCoercion.scala| 80 +++
 .../catalyst/analysis/TypeCoercionSuite.scala   | 83 +---
 2 files changed, 120 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/9af8f743/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index dfaac92..2c00957 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -101,13 +101,11 @@ object TypeCoercion {
 case _ => None
   }
 
-  /** Similar to [[findTightestCommonType]], but can promote all the way to 
StringType. */
-  def findTightestCommonTypeToString(left: DataType, right: DataType): 
Option[DataType] = {
-findTightestCommonType(left, right).orElse((left, right) match {
-  case (StringType, t2: AtomicType) if t2 != BinaryType && t2 != 
BooleanType => Some(StringType)
-  case (t1: AtomicType, StringType) if t1 != BinaryType && t1 != 
BooleanType => Some(StringType)
-  case _ => None
-})
+  /** Promotes all the way to StringType. */
+  private def stringPromotion(dt1: DataType, dt2: DataType): Option[DataType] 
= (dt1, dt2) match {
+case (StringType, t2: AtomicType) if t2 != BinaryType && t2 != BooleanType 
=> Some(StringType)
+case (t1: AtomicType, StringType) if t1 != BinaryType && t1 != BooleanType 
=> Some(StringType)
+case _ => None
   }
 
   /**
@@ -117,21 +115,17 @@ object TypeCoercion {
 

spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.0 23050c8a1 -> f50c4372c


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.

(cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/f50c4372
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/f50c4372
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/f50c4372

Branch: refs/heads/branch-2.0
Commit: f50c4372c3ebd91c0f6c094a7c4d1dd08f3cdb30
Parents: 23050c8
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 12:57:29 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--
 

spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 2968d8c06 -> 5db234730


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.

(cherry picked from commit 1c4d10b10c78d138b55e381ec6828e04fef70d6f)
Signed-off-by: Cheng Lian 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5db23473
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5db23473
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5db23473

Branch: refs/heads/branch-2.1
Commit: 5db23473008a58fb9a7f77ad8b01bcdc2c5f2d9c
Parents: 2968d8c
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 12:49:37 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--
 

spark git commit: [HOTFIX][SPARK-19542][SS]Fix the missing import in DataStreamReaderWriterSuite

2017-02-13 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 328b22984 -> 2968d8c06


[HOTFIX][SPARK-19542][SS]Fix the missing import in DataStreamReaderWriterSuite


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2968d8c0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2968d8c0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2968d8c0

Branch: refs/heads/branch-2.1
Commit: 2968d8c0666801fb6a363dfca3c5a85ee8a1cc0c
Parents: 328b229
Author: Shixiong Zhu 
Authored: Mon Feb 13 12:35:56 2017 -0800
Committer: Shixiong Zhu 
Committed: Mon Feb 13 12:36:00 2017 -0800

--
 .../spark/sql/streaming/test/DataStreamReaderWriterSuite.scala  | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2968d8c0/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
index f751948..4e63b04 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/test/DataStreamReaderWriterSuite.scala
@@ -22,6 +22,7 @@ import java.util.concurrent.TimeUnit
 
 import scala.concurrent.duration._
 
+import org.apache.hadoop.fs.Path
 import org.mockito.Mockito._
 import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 import org.scalatest.PrivateMethodTester.PrivateMethod


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-17714][CORE][TEST-MAVEN][TEST-HADOOP2.6] Avoid using ExecutorClassLoader to load Netty generated classes

2017-02-13 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 c5a7cb022 -> 328b22984


[SPARK-17714][CORE][TEST-MAVEN][TEST-HADOOP2.6] Avoid using ExecutorClassLoader 
to load Netty generated classes

## What changes were proposed in this pull request?

Netty's `MessageToMessageEncoder` uses 
[Javassist](https://github.com/netty/netty/blob/91a0bdc17a8298437d6de08a8958d753799bd4a6/common/src/main/java/io/netty/util/internal/JavassistTypeParameterMatcherGenerator.java#L62)
 to generate a matcher class and the implementation calls `Class.forName` to 
check if this class is already generated. If `MessageEncoder` or 
`MessageDecoder` is created in `ExecutorClassLoader.findClass`, it will cause 
`ClassCircularityError`. This is because loading this Netty generated class 
will call `ExecutorClassLoader.findClass` to search this class, and 
`ExecutorClassLoader` will try to use RPC to load it and cause to load the 
non-exist matcher class again. JVM will report `ClassCircularityError` to 
prevent such infinite recursion.

# Why it only happens in Maven builds

It's because Maven and SBT have different class loader tree. The Maven build 
will set a URLClassLoader as the current context class loader to run the tests 
and expose this issue. The class loader tree is as following:

```
bootstrap class loader -- ... - REPL class loader  
ExecutorClassLoader
|
|
URLClasssLoader
```

The SBT build uses the bootstrap class loader directly and 
`ReplSuite.test("propagation of local properties")` is the first test in 
ReplSuite, which happens to load 
`io/netty/util/internal/__matchers__/org/apache/spark/network/protocol/MessageMatcher`
 into the bootstrap class loader (Note: in maven build, it's loaded into 
URLClasssLoader so it cannot be found in ExecutorClassLoader). This issue can 
be reproduced in SBT as well. Here are the produce steps:
- Enable `hadoop.caller.context.enabled`.
- Replace `Class.forName` with `Utils.classForName` in `object CallerContext`.
- Ignore `ReplSuite.test("propagation of local properties")`.
- Run `ReplSuite` using SBT.

This PR just creates a singleton MessageEncoder and MessageDecoder and makes 
sure they are created before switching to ExecutorClassLoader. TransportContext 
will be created when creating RpcEnv and that happens before creating 
ExecutorClassLoader.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16859 from zsxwing/SPARK-17714.

(cherry picked from commit 905fdf0c243e1776c54c01a25b17878361400225)
Signed-off-by: Shixiong Zhu 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/328b2298
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/328b2298
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/328b2298

Branch: refs/heads/branch-2.1
Commit: 328b229840d6e87c7faf7ee3cd5bf66a905c9a7d
Parents: c5a7cb0
Author: Shixiong Zhu 
Authored: Mon Feb 13 12:03:36 2017 -0800
Committer: Shixiong Zhu 
Committed: Mon Feb 13 12:03:44 2017 -0800

--
 .../apache/spark/network/TransportContext.java  | 22 ++--
 .../spark/network/protocol/MessageDecoder.java  |  4 
 .../spark/network/protocol/MessageEncoder.java  |  4 
 .../network/server/TransportChannelHandler.java | 11 +-
 .../org/apache/spark/network/ProtocolSuite.java |  8 +++
 .../scala/org/apache/spark/util/Utils.scala | 16 --
 6 files changed, 38 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/328b2298/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 5b69e2b..37ba543 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -62,8 +62,20 @@ public class TransportContext {
   private final RpcHandler rpcHandler;
   private final boolean closeIdleConnections;
 
-  private final MessageEncoder encoder;
-  private final MessageDecoder decoder;
+  /**
+   * Force to create MessageEncoder and MessageDecoder so that we can make 
sure they will be created
+   * before switching the current context class loader to ExecutorClassLoader.
+   *
+   * Netty's MessageToMessageEncoder uses Javassist to generate a matcher 
class and the
+   * implementation calls "Class.forName" to check if this calls is already 
generated. If the
+   * following two objects are created in "ExecutorClassLoader.findClass", 

spark git commit: [SPARK-17714][CORE][TEST-MAVEN][TEST-HADOOP2.6] Avoid using ExecutorClassLoader to load Netty generated classes

2017-02-13 Thread zsxwing
Repository: spark
Updated Branches:
  refs/heads/master 3dbff9be0 -> 905fdf0c2


[SPARK-17714][CORE][TEST-MAVEN][TEST-HADOOP2.6] Avoid using ExecutorClassLoader 
to load Netty generated classes

## What changes were proposed in this pull request?

Netty's `MessageToMessageEncoder` uses 
[Javassist](https://github.com/netty/netty/blob/91a0bdc17a8298437d6de08a8958d753799bd4a6/common/src/main/java/io/netty/util/internal/JavassistTypeParameterMatcherGenerator.java#L62)
 to generate a matcher class and the implementation calls `Class.forName` to 
check if this class is already generated. If `MessageEncoder` or 
`MessageDecoder` is created in `ExecutorClassLoader.findClass`, it will cause 
`ClassCircularityError`. This is because loading this Netty generated class 
will call `ExecutorClassLoader.findClass` to search this class, and 
`ExecutorClassLoader` will try to use RPC to load it and cause to load the 
non-exist matcher class again. JVM will report `ClassCircularityError` to 
prevent such infinite recursion.

# Why it only happens in Maven builds

It's because Maven and SBT have different class loader tree. The Maven build 
will set a URLClassLoader as the current context class loader to run the tests 
and expose this issue. The class loader tree is as following:

```
bootstrap class loader -- ... - REPL class loader  
ExecutorClassLoader
|
|
URLClasssLoader
```

The SBT build uses the bootstrap class loader directly and 
`ReplSuite.test("propagation of local properties")` is the first test in 
ReplSuite, which happens to load 
`io/netty/util/internal/__matchers__/org/apache/spark/network/protocol/MessageMatcher`
 into the bootstrap class loader (Note: in maven build, it's loaded into 
URLClasssLoader so it cannot be found in ExecutorClassLoader). This issue can 
be reproduced in SBT as well. Here are the produce steps:
- Enable `hadoop.caller.context.enabled`.
- Replace `Class.forName` with `Utils.classForName` in `object CallerContext`.
- Ignore `ReplSuite.test("propagation of local properties")`.
- Run `ReplSuite` using SBT.

This PR just creates a singleton MessageEncoder and MessageDecoder and makes 
sure they are created before switching to ExecutorClassLoader. TransportContext 
will be created when creating RpcEnv and that happens before creating 
ExecutorClassLoader.

## How was this patch tested?

Jenkins

Author: Shixiong Zhu 

Closes #16859 from zsxwing/SPARK-17714.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/905fdf0c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/905fdf0c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/905fdf0c

Branch: refs/heads/master
Commit: 905fdf0c243e1776c54c01a25b17878361400225
Parents: 3dbff9b
Author: Shixiong Zhu 
Authored: Mon Feb 13 12:03:36 2017 -0800
Committer: Shixiong Zhu 
Committed: Mon Feb 13 12:03:36 2017 -0800

--
 .../apache/spark/network/TransportContext.java  | 22 ++--
 .../spark/network/protocol/MessageDecoder.java  |  4 
 .../spark/network/protocol/MessageEncoder.java  |  4 
 .../network/server/TransportChannelHandler.java | 11 +-
 .../org/apache/spark/network/ProtocolSuite.java |  8 +++
 .../scala/org/apache/spark/util/Utils.scala | 16 --
 6 files changed, 38 insertions(+), 27 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/905fdf0c/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
--
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 5b69e2b..37ba543 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -62,8 +62,20 @@ public class TransportContext {
   private final RpcHandler rpcHandler;
   private final boolean closeIdleConnections;
 
-  private final MessageEncoder encoder;
-  private final MessageDecoder decoder;
+  /**
+   * Force to create MessageEncoder and MessageDecoder so that we can make 
sure they will be created
+   * before switching the current context class loader to ExecutorClassLoader.
+   *
+   * Netty's MessageToMessageEncoder uses Javassist to generate a matcher 
class and the
+   * implementation calls "Class.forName" to check if this calls is already 
generated. If the
+   * following two objects are created in "ExecutorClassLoader.findClass", it 
will cause
+   * "ClassCircularityError". This is because loading this Netty generated 
class will call
+   * 

spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors

2017-02-13 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 ef4fb7ebc -> c5a7cb022


[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without 
errors

## What changes were proposed in this pull request?

When a query uses a temp checkpoint dir, it's better to delete it if it's 
stopped without errors.

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #16880 from zsxwing/delete-temp-checkpoint.

(cherry picked from commit 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529)
Signed-off-by: Burak Yavuz 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/c5a7cb02
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/c5a7cb02
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/c5a7cb02

Branch: refs/heads/branch-2.1
Commit: c5a7cb0225ed4ed0d1ede5da0593b258c5dfd79f
Parents: ef4fb7e
Author: Shixiong Zhu 
Authored: Mon Feb 13 11:54:54 2017 -0800
Committer: Burak Yavuz 
Committed: Mon Feb 13 11:55:11 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 --
 .../sql/streaming/StreamingQueryManager.scala   |  6 -
 .../test/DataStreamReaderWriterSuite.scala  | 26 
 3 files changed, 53 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index a35950e..a8ec73e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
  * [[Source]] present in the query plan. Whenever new data arrives, a 
[[QueryExecution]] is created
  * and the results are committed transactionally to the given [[Sink]].
+ *
+ * @param deleteCheckpointOnStop whether to delete the checkpoint if the query 
is stopped without
+ *   errors
  */
 class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
-checkpointRoot: String,
+val checkpointRoot: String,
 analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
-val outputMode: OutputMode)
+val outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
   extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
   def start(): Unit = {
+logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query 
checkpoint.")
 microBatchThread.setDaemon(true)
 microBatchThread.start()
 startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -323,6 +329,20 @@ class StreamExecution(
 sparkSession.streams.notifyQueryTermination(StreamExecution.this)
 postEvent(
   new QueryTerminatedEvent(id, runId, 
exception.map(_.cause).map(Utils.exceptionString)))
+
+// Delete the temp checkpoint only when the query didn't fail
+if (deleteCheckpointOnStop && exception.isEmpty) {
+  val checkpointPath = new Path(checkpointRoot)
+  try {
+val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.delete(checkpointPath, true)
+  } catch {
+case NonFatal(e) =>
+  // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
+  // when we cannot delete them.
+  logWarning(s"Cannot delete $checkpointPath", e)
+  }
+}
   } finally {
 terminationLatch.countDown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/c5a7cb02/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 

spark git commit: [SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without errors

2017-02-13 Thread brkyvz
Repository: spark
Updated Branches:
  refs/heads/master 0417ce878 -> 3dbff9be0


[SPARK-19542][SS] Delete the temp checkpoint if a query is stopped without 
errors

## What changes were proposed in this pull request?

When a query uses a temp checkpoint dir, it's better to delete it if it's 
stopped without errors.

## How was this patch tested?

New unit tests.

Author: Shixiong Zhu 

Closes #16880 from zsxwing/delete-temp-checkpoint.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3dbff9be
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3dbff9be
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3dbff9be

Branch: refs/heads/master
Commit: 3dbff9be06c2007fdb2ad4a1e113f3bc7fc06529
Parents: 0417ce8
Author: Shixiong Zhu 
Authored: Mon Feb 13 11:54:54 2017 -0800
Committer: Burak Yavuz 
Committed: Mon Feb 13 11:54:54 2017 -0800

--
 .../execution/streaming/StreamExecution.scala   | 24 --
 .../sql/streaming/StreamingQueryManager.scala   |  6 -
 .../test/DataStreamReaderWriterSuite.scala  | 26 
 3 files changed, 53 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index ea37194..3149ef0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.io.IOException
 import java.util.UUID
 import java.util.concurrent.{CountDownLatch, TimeUnit}
 import java.util.concurrent.locks.ReentrantLock
@@ -41,16 +42,20 @@ import org.apache.spark.util.{Clock, UninterruptibleThread, 
Utils}
  * Unlike a standard query, a streaming query executes repeatedly each time 
new data arrives at any
  * [[Source]] present in the query plan. Whenever new data arrives, a 
[[QueryExecution]] is created
  * and the results are committed transactionally to the given [[Sink]].
+ *
+ * @param deleteCheckpointOnStop whether to delete the checkpoint if the query 
is stopped without
+ *   errors
  */
 class StreamExecution(
 override val sparkSession: SparkSession,
 override val name: String,
-checkpointRoot: String,
+val checkpointRoot: String,
 analyzedPlan: LogicalPlan,
 val sink: Sink,
 val trigger: Trigger,
 val triggerClock: Clock,
-val outputMode: OutputMode)
+val outputMode: OutputMode,
+deleteCheckpointOnStop: Boolean)
   extends StreamingQuery with ProgressReporter with Logging {
 
   import org.apache.spark.sql.streaming.StreamingQueryListener._
@@ -213,6 +218,7 @@ class StreamExecution(
* has been posted to all the listeners.
*/
   def start(): Unit = {
+logInfo(s"Starting $prettyIdString. Use $checkpointRoot to store the query 
checkpoint.")
 microBatchThread.setDaemon(true)
 microBatchThread.start()
 startLatch.await()  // Wait until thread started and QueryStart event has 
been posted
@@ -323,6 +329,20 @@ class StreamExecution(
 sparkSession.streams.notifyQueryTermination(StreamExecution.this)
 postEvent(
   new QueryTerminatedEvent(id, runId, 
exception.map(_.cause).map(Utils.exceptionString)))
+
+// Delete the temp checkpoint only when the query didn't fail
+if (deleteCheckpointOnStop && exception.isEmpty) {
+  val checkpointPath = new Path(checkpointRoot)
+  try {
+val fs = 
checkpointPath.getFileSystem(sparkSession.sessionState.newHadoopConf())
+fs.delete(checkpointPath, true)
+  } catch {
+case NonFatal(e) =>
+  // Deleting temp checkpoint folder is best effort, don't throw 
non fatal exceptions
+  // when we cannot delete them.
+  logWarning(s"Cannot delete $checkpointPath", e)
+  }
+}
   } finally {
 terminationLatch.countDown()
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/3dbff9be/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 

spark git commit: [SPARK-19514] Enhancing the test for Range interruption.

2017-02-13 Thread rxin
Repository: spark
Updated Branches:
  refs/heads/master 1c4d10b10 -> 0417ce878


[SPARK-19514] Enhancing the test for Range interruption.

Improve the test for SPARK-19514, so that it's clear which stage is being 
cancelled.

Author: Ala Luszczak 

Closes #16914 from ala/fix-range-test.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/0417ce87
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/0417ce87
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/0417ce87

Branch: refs/heads/master
Commit: 0417ce8787245791342d5609446f0e2fc4c219b1
Parents: 1c4d10b
Author: Ala Luszczak 
Authored: Mon Feb 13 20:07:39 2017 +0100
Committer: Reynold Xin 
Committed: Mon Feb 13 20:07:39 2017 +0100

--
 .../apache/spark/sql/DataFrameRangeSuite.scala  | 21 ++--
 1 file changed, 10 insertions(+), 11 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/0417ce87/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
index 03bf2d7..acf393a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameRangeSuite.scala
@@ -23,8 +23,8 @@ import scala.util.Random
 
 import org.scalatest.concurrent.Eventually
 
-import org.apache.spark.SparkException
-import org.apache.spark.scheduler.{SparkListener, 
SparkListenerExecutorMetricsUpdate, SparkListenerTaskStart}
+import org.apache.spark.{SparkException, TaskContext}
+import org.apache.spark.scheduler.{SparkListener, SparkListenerJobStart}
 import org.apache.spark.sql.functions._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -137,23 +137,23 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 
   test("Cancelling stage in a query with Range.") {
 val listener = new SparkListener {
-  override def onTaskStart(taskStart: SparkListenerTaskStart): Unit = {
+  override def onJobStart(jobStart: SparkListenerJobStart): Unit = {
 eventually(timeout(10.seconds)) {
-  assert(DataFrameRangeSuite.isTaskStarted)
+  assert(DataFrameRangeSuite.stageToKill > 0)
 }
-sparkContext.cancelStage(taskStart.stageId)
+sparkContext.cancelStage(DataFrameRangeSuite.stageToKill)
   }
 }
 
 sparkContext.addSparkListener(listener)
 for (codegen <- Seq(true, false)) {
   withSQLConf(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key -> 
codegen.toString()) {
-DataFrameRangeSuite.isTaskStarted = false
+DataFrameRangeSuite.stageToKill = -1
 val ex = intercept[SparkException] {
-  spark.range(10L).mapPartitions { x =>
-DataFrameRangeSuite.isTaskStarted = true
+  spark.range(10L).map { x =>
+DataFrameRangeSuite.stageToKill = TaskContext.get().stageId()
 x
-  }.crossJoin(spark.range(100L)).toDF("a", "b").agg(sum("a"), 
sum("b")).collect()
+  }.toDF("id").agg(sum("id")).collect()
 }
 ex.getCause() match {
   case null =>
@@ -172,6 +172,5 @@ class DataFrameRangeSuite extends QueryTest with 
SharedSQLContext with Eventuall
 }
 
 object DataFrameRangeSuite {
-  @volatile var isTaskStarted = false
+  @volatile var stageToKill = -1
 }
-


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19529] TransportClientFactory.createClient() shouldn't call awaitUninterruptibly()

2017-02-13 Thread lian
Repository: spark
Updated Branches:
  refs/heads/master ab88b2410 -> 1c4d10b10


[SPARK-19529] TransportClientFactory.createClient() shouldn't call 
awaitUninterruptibly()

## What changes were proposed in this pull request?

This patch replaces a single `awaitUninterruptibly()` call with a plain 
`await()` call in Spark's `network-common` library in order to fix a bug which 
may cause tasks to be uncancellable.

In Spark's Netty RPC layer, `TransportClientFactory.createClient()` calls 
`awaitUninterruptibly()` on a Netty future while waiting for a connection to be 
established. This creates problem when a Spark task is interrupted while 
blocking in this call (which can happen in the event of a slow connection which 
will eventually time out). This has bad impacts on task cancellation when 
`interruptOnCancel = true`.

As an example of the impact of this problem, I experienced significant numbers 
of uncancellable "zombie tasks" on a production cluster where several tasks 
were blocked trying to connect to a dead shuffle server and then continued 
running as zombies after I cancelled the associated Spark stage. The zombie 
tasks ran for several minutes with the following stack:

```
java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:460)
io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:607)
io.netty.util.concurrent.DefaultPromise.awaitUninterruptibly(DefaultPromise.java:301)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:224)
org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:179)
 => holding Monitor(java.lang.Object1849476028})
org.apache.spark.network.shuffle.ExternalShuffleClient$1.createAndStart(ExternalShuffleClient.java:105)
org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding(RetryingBlockFetcher.java:140)
org.apache.spark.network.shuffle.RetryingBlockFetcher.start(RetryingBlockFetcher.java:120)
org.apache.spark.network.shuffle.ExternalShuffleClient.fetchBlocks(ExternalShuffleClient.java:114)
org.apache.spark.storage.ShuffleBlockFetcherIterator.sendRequest(ShuffleBlockFetcherIterator.scala:169)
org.apache.spark.storage.ShuffleBlockFetcherIterator.fetchUpToMaxBytes(ShuffleBlockFetcherIterator.scala:
350)
org.apache.spark.storage.ShuffleBlockFetcherIterator.initialize(ShuffleBlockFetcherIterator.scala:286)
org.apache.spark.storage.ShuffleBlockFetcherIterator.(ShuffleBlockFetcherIterator.scala:120)
org.apache.spark.shuffle.BlockStoreShuffleReader.read(BlockStoreShuffleReader.scala:45)
org.apache.spark.sql.execution.ShuffledRowRDD.compute(ShuffledRowRDD.scala:169)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
[...]
```

As far as I can tell, `awaitUninterruptibly()` might have been used in order to 
avoid having to declare that methods throw `InterruptedException` (this code is 
written in Java, hence the need to use checked exceptions). This patch simply 
replaces this with a regular, interruptible `await()` call,.

This required several interface changes to declare a new checked exception 
(these are internal interfaces, though, and this change doesn't significantly 
impact binary compatibility).

An alternative approach would be to wrap `InterruptedException` into 
`IOException` in order to avoid having to change interfaces. The problem with 
this approach is that the `network-shuffle` project's `RetryingBlockFetcher` 
code treats `IOExceptions` as transitive failures when deciding whether to 
retry fetches, so throwing a wrapped `IOException` might cause an interrupted 
shuffle fetch to be retried, further prolonging the lifetime of a cancelled 
zombie task.

Note that there are three other `awaitUninterruptibly()` in the codebase, but 
those calls have a hard 10 second timeout and are waiting on a `close()` 
operation which is expected to complete near instantaneously, so the impact of 
uninterruptibility there is much smaller.

## How was this patch tested?

Manually.

Author: Josh Rosen 

Closes #16866 from JoshRosen/SPARK-19529.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c4d10b1
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c4d10b1
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c4d10b1

Branch: refs/heads/master
Commit: 1c4d10b10c78d138b55e381ec6828e04fef70d6f
Parents: ab88b24
Author: Josh Rosen 
Authored: Mon Feb 13 11:04:27 2017 -0800
Committer: Cheng Lian 
Committed: Mon Feb 13 11:04:27 2017 -0800

--
 .../network/client/TransportClientFactory.java  | 10 ++
 .../spark/network/TransportClientFactorySuite.java  |  6 --
 .../network/shuffle/ExternalShuffleClient.java  |  4 ++--
 

spark git commit: [SPARK-19427][PYTHON][SQL] Support data type string as a returnType argument of UDF

2017-02-13 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 5e7cd3322 -> ab88b2410


[SPARK-19427][PYTHON][SQL] Support data type string as a returnType argument of 
UDF

## What changes were proposed in this pull request?

Add support for data type string as a return type argument of 
`UserDefinedFunction`:

```python
f = udf(lambda x: x, "integer")
 f.returnType

## IntegerType
```

## How was this patch tested?

Existing unit tests, additional unit tests covering new feature.

Author: zero323 

Closes #16769 from zero323/SPARK-19427.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ab88b241
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ab88b241
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ab88b241

Branch: refs/heads/master
Commit: ab88b2410623e5fdb06d558017bd6d50220e466a
Parents: 5e7cd33
Author: zero323 
Authored: Mon Feb 13 10:37:34 2017 -0800
Committer: Holden Karau 
Committed: Mon Feb 13 10:37:34 2017 -0800

--
 python/pyspark/sql/functions.py |  8 +---
 python/pyspark/sql/tests.py | 15 +++
 2 files changed, 20 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ab88b241/python/pyspark/sql/functions.py
--
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 40727ab..5213a3c 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -27,7 +27,7 @@ if sys.version < "3":
 from pyspark import since, SparkContext
 from pyspark.rdd import _prepare_for_python_RDD, ignore_unicode_prefix
 from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.sql.types import StringType
+from pyspark.sql.types import StringType, DataType, _parse_datatype_string
 from pyspark.sql.column import Column, _to_java_column, _to_seq
 from pyspark.sql.dataframe import DataFrame
 
@@ -1865,7 +1865,9 @@ class UserDefinedFunction(object):
 """
 def __init__(self, func, returnType, name=None):
 self.func = func
-self.returnType = returnType
+self.returnType = (
+returnType if isinstance(returnType, DataType)
+else _parse_datatype_string(returnType))
 # Stores UserDefinedPythonFunctions jobj, once initialized
 self._judf_placeholder = None
 self._name = name or (
@@ -1909,7 +1911,7 @@ def udf(f, returnType=StringType()):
 it is present in the query.
 
 :param f: python function
-:param returnType: a :class:`pyspark.sql.types.DataType` object
+:param returnType: a :class:`pyspark.sql.types.DataType` object or data 
type string.
 
 >>> from pyspark.sql.types import IntegerType
 >>> slen = udf(lambda s: len(s), IntegerType())

http://git-wip-us.apache.org/repos/asf/spark/blob/ab88b241/python/pyspark/sql/tests.py
--
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index 710585c..ab9d3f6 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -489,6 +489,21 @@ class SQLTests(ReusedPySparkTestCase):
 "judf should be initialized after UDF has been called."
 )
 
+def test_udf_with_string_return_type(self):
+from pyspark.sql.functions import UserDefinedFunction
+
+add_one = UserDefinedFunction(lambda x: x + 1, "integer")
+make_pair = UserDefinedFunction(lambda x: (-x, x), 
"struct")
+make_array = UserDefinedFunction(
+lambda x: [float(x) for x in range(x, x + 3)], "array")
+
+expected = (2, Row(x=-1, y=1), [1.0, 2.0, 3.0])
+actual = (self.spark.range(1, 2).toDF("x")
+  .select(add_one("x"), make_pair("x"), make_array("x"))
+  .first())
+
+self.assertTupleEqual(expected, actual)
+
 def test_basic_functions(self):
 rdd = self.sc.parallelize(['{"foo":"bar"}', '{"foo":"baz"}'])
 df = self.spark.read.json(rdd)


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19506][ML][PYTHON] Import warnings in pyspark.ml.util

2017-02-13 Thread holden
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 a3b675137 -> ef4fb7ebc


[SPARK-19506][ML][PYTHON] Import warnings in pyspark.ml.util

## What changes were proposed in this pull request?

Add missing `warnings` import.

## How was this patch tested?

Manual tests.

Author: zero323 

Closes #16846 from zero323/SPARK-19506.

(cherry picked from commit 5e7cd3322b04f1dd207829b70546bc7ffdd63363)
Signed-off-by: Holden Karau 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ef4fb7eb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ef4fb7eb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ef4fb7eb

Branch: refs/heads/branch-2.1
Commit: ef4fb7ebca963eb95d6a8bf7543e05aa375edc23
Parents: a3b6751
Author: zero323 
Authored: Mon Feb 13 09:26:49 2017 -0800
Committer: Holden Karau 
Committed: Mon Feb 13 09:36:52 2017 -0800

--
 python/pyspark/ml/util.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/ef4fb7eb/python/pyspark/ml/util.py
--
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index c65b3d1..02016f1 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -17,6 +17,7 @@
 
 import sys
 import uuid
+import warnings
 
 if sys.version > '3':
 basestring = str


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19506][ML][PYTHON] Import warnings in pyspark.ml.util

2017-02-13 Thread holden
Repository: spark
Updated Branches:
  refs/heads/master 4321ff9ed -> 5e7cd3322


[SPARK-19506][ML][PYTHON] Import warnings in pyspark.ml.util

## What changes were proposed in this pull request?

Add missing `warnings` import.

## How was this patch tested?

Manual tests.

Author: zero323 

Closes #16846 from zero323/SPARK-19506.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5e7cd332
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5e7cd332
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5e7cd332

Branch: refs/heads/master
Commit: 5e7cd3322b04f1dd207829b70546bc7ffdd63363
Parents: 4321ff9
Author: zero323 
Authored: Mon Feb 13 09:26:49 2017 -0800
Committer: Holden Karau 
Committed: Mon Feb 13 09:26:49 2017 -0800

--
 python/pyspark/ml/util.py | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/5e7cd332/python/pyspark/ml/util.py
--
diff --git a/python/pyspark/ml/util.py b/python/pyspark/ml/util.py
index c65b3d1..02016f1 100644
--- a/python/pyspark/ml/util.py
+++ b/python/pyspark/ml/util.py
@@ -17,6 +17,7 @@
 
 import sys
 import uuid
+import warnings
 
 if sys.version > '3':
 basestring = str


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19544][SQL] Improve error message when some column types are compatible and others are not in set operations

2017-02-13 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 04ad82253 -> 4321ff9ed


[SPARK-19544][SQL] Improve error message when some column types are compatible 
and others are not in set operations

## What changes were proposed in this pull request?

This PR proposes to fix the error message when some data types are compatible 
and others are not in set/union operation.

Currently, the code below:

```scala
Seq((1,("a", 1))).toDF.union(Seq((1L,("a", "b"))).toDF)
```

throws an exception saying `LongType` and `IntegerType` are incompatible types. 
It should say something about `StructType`s with more readable format as below:

**Before**

```
Union can only be performed on tables with the compatible column types.
LongType <> IntegerType at the first column of the second table;;
```

**After**

```
Union can only be performed on tables with the compatible column types.
struct<_1:string,_2:string> <> struct<_1:string,_2:int> at the second column of 
the second table;;
```

*I manually inserted a newline in the messages above for readability only in 
this PR description.

## How was this patch tested?

Unit tests in `AnalysisErrorSuite`, manual tests and build wth Scala 2.10.

Author: hyukjinkwon 

Closes #16882 from HyukjinKwon/SPARK-19544.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4321ff9e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4321ff9e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4321ff9e

Branch: refs/heads/master
Commit: 4321ff9edda4961273ac4a5b02dc1aed03f05e47
Parents: 04ad822
Author: hyukjinkwon 
Authored: Mon Feb 13 16:08:31 2017 +0100
Committer: Herman van Hovell 
Committed: Mon Feb 13 16:08:31 2017 +0100

--
 .../sql/catalyst/analysis/CheckAnalysis.scala   |  6 ++---
 .../sql/catalyst/analysis/TypeCoercion.scala| 24 +++-
 .../catalyst/analysis/AnalysisErrorSuite.scala  | 15 
 .../sql/catalyst/analysis/TestRelations.scala   |  7 ++
 4 files changed, 38 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4321ff9e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
index b4a7c05..532ecb8 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/CheckAnalysis.scala
@@ -321,12 +321,12 @@ trait CheckAnalysis extends PredicateHelper {
   // Check if the data types match.
   dataTypes(child).zip(ref).zipWithIndex.foreach { case ((dt1, 
dt2), ci) =>
 // SPARK-18058: we shall not care about the nullability of 
columns
-if (!dt1.sameType(dt2)) {
+if (TypeCoercion.findWiderTypeForTwo(dt1.asNullable, 
dt2.asNullable).isEmpty) {
   failAnalysis(
 s"""
   |${operator.nodeName} can only be performed on tables 
with the compatible
-  |column types. $dt1 <> $dt2 at the ${ordinalNumber(ci)} 
column of
-  |the ${ordinalNumber(ti + 1)} table
+  |column types. ${dt1.catalogString} <> 
${dt2.catalogString} at the
+  |${ordinalNumber(ci)} column of the ${ordinalNumber(ti + 
1)} table
 """.stripMargin.replace("\n", " ").trim())
 }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4321ff9e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
index b636c31..dfaac92 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercion.scala
@@ -116,17 +116,19 @@ object TypeCoercion {
* i.e. the main difference with [[findTightestCommonType]] is that here we 
allow some
* loss of precision when widening decimal and double, and promotion to 
string.
*/
-  private def findWiderTypeForTwo(t1: DataType, t2: DataType): 
Option[DataType] = (t1, t2) match {
-case (t1: DecimalType, t2: DecimalType) =>
-  Some(DecimalPrecision.widerDecimalType(t1, t2))
-case (t: 

spark git commit: [SPARK-19496][SQL] to_date udf to return null when input date is invalid

2017-02-13 Thread hvanhovell
Repository: spark
Updated Branches:
  refs/heads/master 8f03ad547 -> 04ad82253


[SPARK-19496][SQL] to_date udf to return null when input date is invalid

## What changes were proposed in this pull request?

Currently the udf  `to_date` has different return value with an invalid date 
input.

```
SELECT to_date('2015-07-22', '-dd-MM') ->  return `2016-10-07`
SELECT to_date('2014-31-12')-> return null
```

As discussed in JIRA 
[SPARK-19496](https://issues.apache.org/jira/browse/SPARK-19496), we should 
return null in both situations when the input date is invalid

## How was this patch tested?
unit test added

Author: windpiger 

Closes #16870 from windpiger/to_date.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04ad8225
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04ad8225
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04ad8225

Branch: refs/heads/master
Commit: 04ad822534e8ded96a9ba4b7d43320e53c6d2808
Parents: 8f03ad5
Author: windpiger 
Authored: Mon Feb 13 12:25:13 2017 +0100
Committer: Herman van Hovell 
Committed: Mon Feb 13 12:25:13 2017 +0100

--
 .../spark/sql/catalyst/util/DateTimeUtils.scala |  4 ++
 .../apache/spark/sql/DateFunctionsSuite.scala   | 75 ++--
 2 files changed, 75 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/04ad8225/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
index af70efb..9e1de0f 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/DateTimeUtils.scala
@@ -98,6 +98,10 @@ object DateTimeUtils {
   def newDateFormat(formatString: String, timeZone: TimeZone): DateFormat = {
 val sdf = new SimpleDateFormat(formatString, Locale.US)
 sdf.setTimeZone(timeZone)
+// Enable strict parsing, if the input date/format is invalid, it will 
throw an exception.
+// e.g. to parse invalid date '2016-13-12', or '2016-01-12' with  invalid 
format '-aa-dd',
+// an exception will be throwed.
+sdf.setLenient(false)
 sdf
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/04ad8225/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
--
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
index 618db43..2acda3f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DateFunctionsSuite.scala
@@ -398,14 +398,27 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 Row(Date.valueOf("2014-12-31"
 checkAnswer(
   df.select(to_date(col("s"), "-MM-dd")),
-  Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")),
-Row(Date.valueOf("2016-07-12"
+  Seq(Row(Date.valueOf("2015-07-22")), Row(Date.valueOf("2014-12-31")), 
Row(null)))
 
 //  now switch format
 checkAnswer(
   df.select(to_date(col("s"), "-dd-MM")),
-  Seq(Row(Date.valueOf("2016-10-07")), Row(Date.valueOf("2016-07-12")),
-Row(Date.valueOf("2014-12-31"
+  Seq(Row(null), Row(null), Row(Date.valueOf("2014-12-31"
+
+// invalid format
+checkAnswer(
+  df.select(to_date(col("s"), "-hh-MM")),
+  Seq(Row(null), Row(null), Row(null)))
+checkAnswer(
+  df.select(to_date(col("s"), "-dd-aa")),
+  Seq(Row(null), Row(null), Row(null)))
+
+// february
+val x1 = "2016-02-29"
+val x2 = "2017-02-29"
+val df1 = Seq(x1, x2).toDF("x")
+checkAnswer(
+  df1.select(to_date(col("x"))), Row(Date.valueOf("2016-02-29")) :: 
Row(null) :: Nil)
   }
 
   test("function trunc") {
@@ -477,6 +490,35 @@ class DateFunctionsSuite extends QueryTest with 
SharedSQLContext {
 checkAnswer(df.selectExpr(s"unix_timestamp(s, '$fmt')"), Seq(
   Row(ts1.getTime / 1000L), Row(ts2.getTime / 1000L)))
 
+val x1 = "2015-07-24 10:00:00"
+val x2 = "2015-25-07 02:02:02"
+val x3 = "2015-07-24 25:02:02"
+val x4 = "2015-24-07 26:02:02"
+val ts3 = Timestamp.valueOf("2015-07-24 02:25:02")
+val ts4 = Timestamp.valueOf("2015-07-24 00:10:00")
+
+val df1 = Seq(x1, x2, x3, x4).toDF("x")
+checkAnswer(df1.select(unix_timestamp(col("x"))), Seq(
+  

spark git commit: [SPARK-19562][BUILD] Added exclude for dev/pr-deps to gitignore

2017-02-13 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 855a1b755 -> 8f03ad547


[SPARK-19562][BUILD] Added exclude for dev/pr-deps to gitignore

## What changes were proposed in this pull request?

Just adding a missing .gitignore entry.

## How was this patch tested?

Entry added, now repo is not dirty anymore after running the build.

Author: Armin Braun 

Closes #16904 from original-brownbear/SPARK-19562.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8f03ad54
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8f03ad54
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8f03ad54

Branch: refs/heads/master
Commit: 8f03ad547895b63825cff751265231b4fb75d660
Parents: 855a1b7
Author: Armin Braun 
Authored: Mon Feb 13 11:22:31 2017 +
Committer: Sean Owen 
Committed: Mon Feb 13 11:22:31 2017 +

--
 .gitignore | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8f03ad54/.gitignore
--
diff --git a/.gitignore b/.gitignore
index 5634a43..1d91b43 100644
--- a/.gitignore
+++ b/.gitignore
@@ -42,6 +42,7 @@ dependency-reduced-pom.xml
 derby.log
 dev/create-release/*final
 dev/create-release/*txt
+dev/pr-deps/
 dist/
 docs/_site
 docs/api


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19574][ML][DOCUMENTATION] Fix Liquid Exception: Start indices amount is not equal to end indices amount

2017-02-13 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/branch-2.1 fe4fcc570 -> a3b675137


[SPARK-19574][ML][DOCUMENTATION] Fix Liquid Exception: Start indices amount is 
not equal to end indices amount

### What changes were proposed in this pull request?
```
Liquid Exception: Start indices amount is not equal to end indices amount, see 
/Users/xiao/IdeaProjects/sparkDelivery/docs/../examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java.
 in ml-features.md
```

So far, the build is broken after merging 
https://github.com/apache/spark/pull/16789

This PR is to fix it.

## How was this patch tested?
Manual

Author: Xiao Li 

Closes #16908 from gatorsmile/docMLFix.

(cherry picked from commit 855a1b7551c71b26ce7d9310342fefe0a87281ec)
Signed-off-by: Sean Owen 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a3b67513
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a3b67513
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a3b67513

Branch: refs/heads/branch-2.1
Commit: a3b6751375cf301dec156b85fe79e32b0797a24f
Parents: fe4fcc5
Author: Xiao Li 
Authored: Mon Feb 13 11:18:31 2017 +
Committer: Sean Owen 
Committed: Mon Feb 13 11:19:11 2017 +

--
 .../java/org/apache/spark/examples/ml/JavaTokenizerExample.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a3b67513/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index 2fae07a..f42fd33 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType;
 // col("...") is preferable to df.col("...")
 import static org.apache.spark.sql.functions.callUDF;
 import static org.apache.spark.sql.functions.col;
-// $example off
+// $example off$
 
 public class JavaTokenizerExample {
   public static void main(String[] args) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-19574][ML][DOCUMENTATION] Fix Liquid Exception: Start indices amount is not equal to end indices amount

2017-02-13 Thread srowen
Repository: spark
Updated Branches:
  refs/heads/master 2bdbc8705 -> 855a1b755


[SPARK-19574][ML][DOCUMENTATION] Fix Liquid Exception: Start indices amount is 
not equal to end indices amount

### What changes were proposed in this pull request?
```
Liquid Exception: Start indices amount is not equal to end indices amount, see 
/Users/xiao/IdeaProjects/sparkDelivery/docs/../examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java.
 in ml-features.md
```

So far, the build is broken after merging 
https://github.com/apache/spark/pull/16789

This PR is to fix it.

## How was this patch tested?
Manual

Author: Xiao Li 

Closes #16908 from gatorsmile/docMLFix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/855a1b75
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/855a1b75
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/855a1b75

Branch: refs/heads/master
Commit: 855a1b7551c71b26ce7d9310342fefe0a87281ec
Parents: 2bdbc87
Author: Xiao Li 
Authored: Mon Feb 13 11:18:31 2017 +
Committer: Sean Owen 
Committed: Mon Feb 13 11:18:31 2017 +

--
 .../java/org/apache/spark/examples/ml/JavaTokenizerExample.java| 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/855a1b75/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
--
diff --git 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
index 2fae07a..f42fd33 100644
--- 
a/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
+++ 
b/examples/src/main/java/org/apache/spark/examples/ml/JavaTokenizerExample.java
@@ -39,7 +39,7 @@ import org.apache.spark.sql.types.StructType;
 // col("...") is preferable to df.col("...")
 import static org.apache.spark.sql.functions.callUDF;
 import static org.apache.spark.sql.functions.col;
-// $example off
+// $example off$
 
 public class JavaTokenizerExample {
   public static void main(String[] args) {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org