GitHub user zhangwei72 opened a pull request:

    https://github.com/apache/spark/pull/17547

    SPARK-20237

    ## What changes were proposed in this pull request?
    
    (Please fill in changes proposed in this fix)
    
    ## How was this patch tested?
    
    (Please explain how this patch was tested. E.g. unit tests, integration 
tests, manual tests)
    (If this patch involves UI changes, please attach a screenshot; otherwise, 
remove this)
    
    Please review http://spark.apache.org/contributing.html before opening a 
pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/apache/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17547.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17547
    
----
commit 56e1bd337ccb03cb01702e4260e4be59d2aa0ead
Author: Asher Krim <[email protected]>
Date:   2017-03-08T04:36:46Z

    [SPARK-17629][ML] methods to return synonyms directly
    
    ## What changes were proposed in this pull request?
    provide methods to return synonyms directly, without wrapping them in a 
dataframe
    
    In performance sensitive applications (such as user facing apis) the 
roundtrip to and from dataframes is costly and unnecessary
    
    The methods are named ``findSynonymsArray`` to make the return type clear, 
which also implies a local datastructure
    ## How was this patch tested?
    updated word2vec tests
    
    Author: Asher Krim <[email protected]>
    
    Closes #16811 from Krimit/w2vFindSynonymsLocal.

commit 314e48a3584bad4b486b046bbf0159d64ba857bc
Author: Michael Armbrust <[email protected]>
Date:   2017-03-08T09:32:42Z

    [SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
    
    Previously, we were using the mirror of passed in `TypeTag` when reflecting 
to build an encoder.  This fails when the outer class is built in (i.e. `Seq`'s 
default mirror is based on root classloader) but inner classes (i.e. `A` in 
`Seq[A]`) are defined in the REPL or a library.
    
    This patch changes us to always reflect based on a mirror created using the 
context classloader.
    
    Author: Michael Armbrust <[email protected]>
    
    Closes #17201 from marmbrus/replSeqEncoder.

commit 1fa58868bc6635ff2119264665bd3d00b4b1253a
Author: Yanbo Liang <[email protected]>
Date:   2017-03-08T10:05:01Z

    [ML][MINOR] Separate estimator and model params for read/write test.
    
    ## What changes were proposed in this pull request?
    Since we allow ```Estimator``` and ```Model``` not always share same params 
(see ```ALSParams``` and ```ALSModelParams```), we should pass in test params 
for estimator and model separately in function 
```testEstimatorAndModelReadWrite```.
    
    ## How was this patch tested?
    Existing tests.
    
    Author: Yanbo Liang <[email protected]>
    
    Closes #17151 from yanboliang/test-rw.

commit 81303f7ca7808d51229411dce8feeed8c23dbe15
Author: Yanbo Liang <[email protected]>
Date:   2017-03-08T10:09:36Z

    [SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports 
tweedie distribution.
    
    ## What changes were proposed in this pull request?
    PySpark ```GeneralizedLinearRegression``` supports tweedie distribution.
    
    ## How was this patch tested?
    Add unit tests.
    
    Author: Yanbo Liang <[email protected]>
    
    Closes #17146 from yanboliang/spark-19806.

commit 3f9f9180c2e695ad468eb813df5feec41e169531
Author: Yuming Wang <[email protected]>
Date:   2017-03-08T11:31:01Z

    [SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically 
converted to spark.sql.shuffle.partitions
    
    ## What changes were proposed in this pull request?
    Make the `SET mapreduce.job.reduces` automatically converted to 
`spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.
    
    ## How was this patch tested?
    
    unit tests
    
    Author: Yuming Wang <[email protected]>
    
    Closes #17020 from wangyum/SPARK-19693.

commit 9ea201cf6482c9c62c9428759d238063db62d66e
Author: Anthony Truchet <[email protected]>
Date:   2017-03-08T11:44:25Z

    [SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in 
case of exception
    
    ## What changes were proposed in this pull request?
    
    Ensure broadcasted variable are destroyed even in case of exception
    ## How was this patch tested?
    
    Word2VecSuite was run locally
    
    Author: Anthony Truchet <[email protected]>
    
    Closes #14299 from AnthonyTruchet/SPARK-16440.

commit e44274870dee308f4e3e8ce79457d8d19693b6e5
Author: wangzhenhua <[email protected]>
Date:   2017-03-08T15:01:28Z

    [SPARK-17080][SQL] join reorder
    
    ## What changes were proposed in this pull request?
    
    Reorder the joins using a dynamic programming algorithm (Selinger paper):
    First we put all items (basic joined nodes) into level 1, then we build all 
two-way joins at level 2 from plans at level 1 (single items), then build all 
3-way joins from plans at previous levels (two-way joins and single items), 
then 4-way joins ... etc, until we build all n-way joins and pick the best plan 
among them.
    
    When building m-way joins, we only keep the best plan (with the lowest 
cost) for the same set of m items. E.g., for 3-way joins, we keep only the best 
plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A. 
Thus, the plans maintained for each level when reordering four items A, B, C, D 
are as follows:
    ```
    level 1: p({A}), p({B}), p({C}), p({D})
    level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
    level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
    level 4: p({A, B, C, D})
    ```
    where p({A, B, C, D}) is the final output plan.
    
    For cost evaluation, since physical costs for operators are not available 
currently, we use cardinalities and sizes to compute costs.
    
    ## How was this patch tested?
    add test cases
    
    Author: wangzhenhua <[email protected]>
    Author: Zhenhua Wang <[email protected]>
    
    Closes #17138 from wzhfy/joinReorder.

commit 5f7d835d380c1a558a4a6d8366140cd96ee202eb
Author: jiangxingbo <[email protected]>
Date:   2017-03-08T15:18:17Z

    [SPARK-19865][SQL] remove the view identifier in SubqueryAlias
    
    ## What changes were proposed in this pull request?
    
    Since we have a `View` node now, we can remove the view identifier in 
`SubqueryAlias`, which was used to indicate a view node before.
    
    ## How was this patch tested?
    
    Update the related test cases.
    
    Author: jiangxingbo <[email protected]>
    
    Closes #17210 from jiangxb1987/SubqueryAlias.

commit 9a6ac7226fd09d570cae08d0daea82d9bca189a0
Author: Xiao Li <[email protected]>
Date:   2017-03-08T17:36:01Z

    [SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled 
Repartition
    
    ### What changes were proposed in this pull request?
    
    Observed by felixcheung  in https://github.com/apache/spark/pull/16739, 
when users use the shuffle-enabled `repartition` API, they expect the partition 
they got should be the exact number they provided, even if they call 
shuffle-disabled `coalesce` later.
    
    Currently, `CollapseRepartition` rule does not consider whether shuffle is 
enabled or not. Thus, we got the following unexpected result.
    
    ```Scala
        val df = spark.range(0, 10000, 1, 5)
        val df2 = df.repartition(10)
        assert(df2.coalesce(13).rdd.getNumPartitions == 5)
        assert(df2.coalesce(7).rdd.getNumPartitions == 5)
        assert(df2.coalesce(3).rdd.getNumPartitions == 3)
    ```
    
    This PR is to fix the issue. We preserve shuffle-enabled Repartition.
    
    ### How was this patch tested?
    Added a test case
    
    Author: Xiao Li <[email protected]>
    
    Closes #16933 from gatorsmile/CollapseRepartition.

commit e420fd4592615d91cdcbca674ac58bcca6ab2ff3
Author: Tejas Patil <[email protected]>
Date:   2017-03-08T17:38:05Z

    [SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`
    
    ## What changes were proposed in this pull request?
    
    This is as per suggestion by rxin at : 
https://github.com/apache/spark/pull/17184#discussion_r104841735
    
    ## How was this patch tested?
    
    NA as this is a documentation change
    
    Author: Tejas Patil <[email protected]>
    
    Closes #17205 from tejasapatil/SPARK-19843_followup.

commit f3387d97487cbef894b6963bc008f6a5c4294a85
Author: windpiger <[email protected]>
Date:   2017-03-08T18:48:53Z

    [SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize 
some code
    
    ## What changes were proposed in this pull request?
    
    Currently there are lots of places to make the path qualified, it is better 
to provide a function to do this, then the code will be more simple.
    
    ## How was this patch tested?
    N/A
    
    Author: windpiger <[email protected]>
    
    Closes #17204 from windpiger/addQualifiledPathUtil.

commit e9e2c612d58a19ddcb4b6abfb7389a4b0f7ef6f8
Author: Wojtek Szymanski <[email protected]>
Date:   2017-03-08T20:36:16Z

    [SPARK-19727][SQL] Fix for round function that modifies original column
    
    ## What changes were proposed in this pull request?
    
    Fix for SQL round function that modifies original column when underlying 
data frame is created from a local product.
    
        import org.apache.spark.sql.functions._
    
        case class NumericRow(value: BigDecimal)
    
        val df = 
spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"))))
    
        df.show()
        +--------------------+
        |               value|
        +--------------------+
        |1.234567890000000000|
        +--------------------+
    
        df.withColumn("value_rounded", round('value)).show()
    
        // before
        +--------------------+-------------+
        |               value|value_rounded|
        +--------------------+-------------+
        |1.000000000000000000|            1|
        +--------------------+-------------+
    
        // after
        +--------------------+-------------+
        |               value|value_rounded|
        +--------------------+-------------+
        |1.234567890000000000|            1|
        +--------------------+-------------+
    
    ## How was this patch tested?
    
    New unit test added to existing suite 
`org.apache.spark.sql.MathFunctionsSuite`
    
    Author: Wojtek Szymanski <[email protected]>
    
    Closes #17075 from wojtek-szymanski/SPARK-19727.

commit 1bf9012380de2aa7bdf39220b55748defde8b700
Author: Shixiong Zhu <[email protected]>
Date:   2017-03-08T21:18:07Z

    [SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow 
invalid cases
    
    ## What changes were proposed in this pull request?
    
    Add a output mode parameter to `flatMapGroupsWithState` and just define 
`mapGroupsWithState` as `flatMapGroupsWithState(Update)`.
    
    `UnsupportedOperationChecker` is modified to disallow unsupported cases.
    
    - Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
    - For streaming (map/flatMap)GroupsWithState, see the following table:
    
    | Operators  | Supported Query Output Mode |
    | ------------- | ------------- |
    | flatMapGroupsWithState(Update) without aggregation  | Update |
    | flatMapGroupsWithState(Update) with aggregation  | None |
    | flatMapGroupsWithState(Append) without aggregation  | Append |
    | flatMapGroupsWithState(Append) before aggregation  | Append, Update, 
Complete |
    | flatMapGroupsWithState(Append) after aggregation  | None |
    | Multiple flatMapGroupsWithState(Append)s  | Append |
    | Multiple mapGroupsWithStates  | None |
    | Mxing mapGroupsWithStates  and flatMapGroupsWithStates | None |
    | Other cases of multiple flatMapGroupsWithState | None |
    
    ## How was this patch tested?
    
    The added unit tests. Here are the tests related to 
(map/flatMap)GroupsWithState:
    ```
    [info] - batch plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond)
    [info] - batch plan - flatMapGroupsWithState - multiple 
flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
    [info] - batch plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds)
    [info] - batch plan - flatMapGroupsWithState - multiple 
flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
update mode: supported (2 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
append mode: not supported (7 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation without aggregation in 
complete mode: not supported (5 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in Append 
mode: not supported (11 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in Update 
mode: not supported (5 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation with aggregation in 
Complete mode: not supported (5 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation without aggregation in 
append mode: supported (1 millisecond)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation without aggregation in 
update mode: not supported (6 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Append mode: supported (1 millisecond)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Update mode: supported (0 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation before aggregation in 
Complete mode: supported (1 millisecond)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation after aggregation in 
Append mode: not supported (6 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on streaming relation after aggregation in 
Update mode: not supported (4 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on streaming relation in complete mode: not 
supported (2 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on batch relation inside streaming relation in 
Append output mode: supported (1 millisecond)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Append) on batch relation inside streaming relation in 
Update output mode: supported (1 millisecond)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on batch relation inside streaming relation in 
Append output mode: supported (0 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - 
flatMapGroupsWithState(Update) on batch relation inside streaming relation in 
Update output mode: supported (0 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState - multiple 
flatMapGroupsWithStates on streaming relation and all are in append mode: 
supported (2 milliseconds)
    [info] - streaming plan - flatMapGroupsWithState -  multiple 
flatMapGroupsWithStates on s streaming relation but some are not in append 
mode: not supported (7 milliseconds)
    [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on 
streaming relation without aggregation in append mode: not supported (3 
milliseconds)
    [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on 
streaming relation without aggregation in complete mode: not supported (3 
milliseconds)
    [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on 
streaming relation with aggregation in Append mode: not supported (6 
milliseconds)
    [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on 
streaming relation with aggregation in Update mode: not supported (3 
milliseconds)
    [info] - streaming plan - mapGroupsWithState - mapGroupsWithState on 
streaming relation with aggregation in Complete mode: not supported (4 
milliseconds)
    [info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates 
on streaming relation and all are in append mode: not supported (4 milliseconds)
    [info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates 
and flatMapGroupsWithStates on streaming relation: not supported (4 
milliseconds)
    ```
    
    Author: Shixiong Zhu <[email protected]>
    
    Closes #17197 from zsxwing/mapgroups-check.

commit 6570cfd7abe349dc6d2151f2ac9dc662e7465a79
Author: Kunal Khamar <[email protected]>
Date:   2017-03-08T21:06:22Z

    [SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session 
has an identical copy of the SessionState
    
    Forking a newSession() from SparkSession currently makes a new SparkSession 
that does not retain SessionState (i.e. temporary tables, SQL config, 
registered functions etc.) This change adds a method cloneSession() which 
creates a new SparkSession with a copy of the parent's SessionState.
    
    Subsequent changes to base session are not propagated to cloned session, 
clone is independent after creation.
    If the base is changed after clone has been created, say user registers new 
UDF, then the new UDF will not be available inside the clone. Same goes for 
configs and temp tables.
    
    Unit tests
    
    Author: Kunal Khamar <[email protected]>
    Author: Shixiong Zhu <[email protected]>
    
    Closes #16826 from kunalkhamar/fork-sparksession.

commit 455129020ca7f6a162f6f2486a87cc43512cfd2c
Author: hyukjinkwon <[email protected]>
Date:   2017-03-08T21:43:09Z

    [SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String] 
storing CSV
    
    ## What changes were proposed in this pull request?
    
    This PR proposes to add an API that loads `DataFrame` from 
`Dataset[String]` storing csv.
    
    It allows pre-processing before loading into CSV, which means allowing a 
lot of workarounds for many narrow cases, for example, as below:
    
    - Case 1 - pre-processing
    
      ```scala
      val df = spark.read.text("...")
      // Pre-processing with this.
      spark.read.csv(df.as[String])
      ```
    
    - Case 2 - use other input formats
    
      ```scala
      val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
        classOf[com.hadoop.mapreduce.LzoTextInputFormat],
        classOf[org.apache.hadoop.io.LongWritable],
        classOf[org.apache.hadoop.io.Text])
      val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0, 
pair._2.getLength))
    
      spark.read.csv(stringRdd.toDS)
      ```
    
    ## How was this patch tested?
    
    Added tests in `CSVSuite` and build with Scala 2.10.
    
    ```
    ./dev/change-scala-version.sh 2.10
    ./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
    ```
    
    Author: hyukjinkwon <[email protected]>
    
    Closes #16854 from HyukjinKwon/SPARK-15463.

commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51
Author: Burak Yavuz <[email protected]>
Date:   2017-03-08T22:35:07Z

    [SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in 
combination with maxFileAge in FileStreamSource
    
    ## What changes were proposed in this pull request?
    
    **The Problem**
    There is a file stream source option called maxFileAge which limits how old 
the files can be, relative the latest file that has been seen. This is used to 
limit the files that need to be remembered as "processed". Files older than the 
latest processed files are ignored. This values is by default 7 days.
    This causes a problem when both
    latestFirst = true
    maxFilesPerTrigger > total files to be processed.
    Here is what happens in all combinations
    1) latestFirst = false - Since files are processed in order, there wont be 
any unprocessed file older than the latest processed file. All files will be 
processed.
    2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge 
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is 
not, then all old files get processed in the first batch, and so no file is 
left behind.
    3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch 
process the latest X files. That sets the threshold latest file - maxFileAge, 
so files older than this threshold will never be considered for processing.
    The bug is with case 3.
    
    **The Solution**
    
    Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are 
set.
    
    ## How was this patch tested?
    
    Regression test in `FileStreamSourceSuite`
    
    Author: Burak Yavuz <[email protected]>
    
    Closes #17153 from brkyvz/maxFileAge.

commit d809ceed9762d5bbb04170e45f38751713112dd8
Author: Dilip Biswal <[email protected]>
Date:   2017-03-09T01:33:49Z

    [MINOR][SQL] The analyzer rules are fired twice for cases when 
AnalysisException is raised from analyzer.
    
    ## What changes were proposed in this pull request?
    In general we have a checkAnalysis phase which validates the logical plan 
and throws AnalysisException on semantic errors. However we also can throw 
AnalysisException from a few analyzer rules like ResolveSubquery.
    
    I found that we fire up the analyzer rules twice for the queries that throw 
AnalysisException from one of the analyzer rules. This is a very minor fix. We 
don't have to strictly fix it. I just got confused seeing the rule getting 
fired two times when i was not expecting it.
    
    ## How was this patch tested?
    
    Tested manually.
    
    Author: Dilip Biswal <[email protected]>
    
    Closes #17214 from dilipbiswal/analyis_twice.

commit 09829be621f0f9bb5076abb3d832925624699fa9
Author: Xiao Li <[email protected]>
Date:   2017-03-09T07:12:10Z

    [SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore
    
    ### What changes were proposed in this pull request?
    So far, the test cases in DDLSuites only verify the behaviors of 
InMemoryCatalog. That means, they do not cover the scenarios using 
HiveExternalCatalog. Thus, we need to improve the existing test suite to run 
these cases using Hive metastore.
    
    When porting these test cases, a bug of `SET LOCATION` is found. `path` is 
not set when the location is changed.
    
    After this PR, a few changes are made, as summarized below,
    - `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite` 
and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using 
`InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
    - `InMemoryCatalogedDDLSuite` contains all the existing test cases in 
`DDLSuite`.
    - `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following 
test cases are excluded:
    
    1. The following test cases only make sense for `InMemoryCatalog`:
    ```
      test("desc table for parquet data source table using in-memory catalog")
      test("create a managed Hive source table") {
      test("create an external Hive source table")
      test("Create Hive Table As Select")
    ```
    
    2. The following test cases are unable to be ported because we are unable 
to alter table provider when using Hive metastore. In the future PRs we need to 
improve the test cases so that altering table provider is not needed:
    ```
      test("alter table: set location (datasource table)")
      test("alter table: set properties (datasource table)")
      test("alter table: unset properties (datasource table)")
      test("alter table: set serde (datasource table)")
      test("alter table: set serde partition (datasource table)")
      test("alter table: change column (datasource table)")
      test("alter table: add partition (datasource table)")
      test("alter table: drop partition (datasource table)")
      test("alter table: rename partition (datasource table)")
      test("drop table - data source table")
    ```
    
    **TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the 
test cases to either `DDLSuite`,  `InMemoryCatalogedDDLSuite` or 
`HiveCatalogedDDLSuite`.
    
    ### How was this patch tested?
    N/A
    
    Author: Xiao Li <[email protected]>
    Author: gatorsmile <[email protected]>
    
    Closes #16592 from gatorsmile/refactorDDLSuite.

commit 029e40b412e332c9f0fff283d604e203066c78c0
Author: Shixiong Zhu <[email protected]>
Date:   2017-03-09T07:15:52Z

    [SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal
    
    ## What changes were proposed in this pull request?
    
    The API docs should not include the "org.apache.spark.sql.internal" package 
because they are internal private APIs.
    
    ## How was this patch tested?
    
    Jenkins
    
    Author: Shixiong Zhu <[email protected]>
    
    Closes #17217 from zsxwing/SPARK-19874.

commit eeb1d6db878641d9eac62d0869a90fe80c1f4461
Author: uncleGen <[email protected]>
Date:   2017-03-09T07:23:10Z

    [SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
    
    ## What changes were proposed in this pull request?
    
    A follow up to SPARK-19859:
    
    - extract the calculation of `delayMs` and reuse it.
    - update EventTimeWatermarkExec
    - use the correct `delayMs` in EventTimeWatermark
    
    ## How was this patch tested?
    
    Jenkins.
    
    Author: uncleGen <[email protected]>
    
    Closes #17221 from uncleGen/SPARK-19859.

commit 274973d2a32ff4eb28545b50a3135e4784eb3047
Author: windpiger <[email protected]>
Date:   2017-03-09T09:18:17Z

    [SPARK-19763][SQL] qualified external datasource table location stored in 
catalog
    
    ## What changes were proposed in this pull request?
    
    If we create a external datasource table with a non-qualified location , we 
should qualified it to store in catalog.
    
    ```
    CREATE TABLE t(a string)
    USING parquet
    LOCATION '/path/xx'
    
    CREATE TABLE t1(a string, b string)
    USING parquet
    PARTITIONED BY(b)
    LOCATION '/path/xx'
    ```
    
    when we get the table from catalog, the location should be qualified, 
e.g.'file:/path/xxx'
    ## How was this patch tested?
    unit test added
    
    Author: windpiger <[email protected]>
    
    Closes #17095 from windpiger/tablepathQualified.

commit 206030bd12405623c00c1ff334663984b9250adb
Author: Jason White <[email protected]>
Date:   2017-03-09T18:34:54Z

    [SPARK-19561][SQL] add int case handling for TimestampType
    
    ## What changes were proposed in this pull request?
    
    Add handling of input of type `Int` for dataType `TimestampType` to 
`EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger 
than MAX_INT to Long, which are handled correctly already, but values between 
MIN_INT and MAX_INT are serialized to Int.
    
    These range limits correspond to roughly half an hour on either side of the 
epoch. As a result, PySpark doesn't allow TimestampType values to be created in 
this range.
    
    Alternatives attempted: patching the `TimestampType.toInternal` function to 
cast return values to `long`, so Py4J would always serialize them to Scala 
Long. Python3 does not have a `long` type, so this approach failed on Python3.
    
    ## How was this patch tested?
    
    Added a new PySpark-side test that fails without the change.
    
    The contribution is my original work and I license the work to the project 
under the project’s open source license.
    
    Resubmission of https://github.com/apache/spark/pull/16896. The original PR 
didn't go through Jenkins and broke the build. davies dongjoon-hyun
    
    cloud-fan Could you kick off a Jenkins run for me? It passed everything for 
me locally, but it's possible something has changed in the last few weeks.
    
    Author: Jason White <[email protected]>
    
    Closes #17200 from JasonMWhite/SPARK-19561.

commit b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9
Author: Jimmy Xiang <[email protected]>
Date:   2017-03-09T18:52:18Z

    [SPARK-19757][CORE] DriverEndpoint#makeOffers race against 
CoarseGrainedSchedulerBackend#killExecutors
    
    ## What changes were proposed in this pull request?
    While some executors are being killed due to idleness, if some new tasks 
come in, driver could assign them to some executors are being killed. These 
tasks will fail later when the executors are lost. This patch is to make sure 
CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are 
properly synchronized.
    
    ## How was this patch tested?
    manual tests
    
    Author: Jimmy Xiang <[email protected]>
    
    Closes #17091 from jxiang/spark-19757.

commit 3232e54f2fcb8d2072cba4bc763ef29d5d8d325f
Author: jinxing <[email protected]>
Date:   2017-03-09T18:56:19Z

    [SPARK-19793] Use clock.getTimeMillis when mark task as finished in 
TaskSetManager.
    
    ## What changes were proposed in this pull request?
    
    TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as 
finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer 
cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`, 
task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set 
by `clock`), the result is not correct.
    
    ## How was this patch tested?
    Existing tests.
    
    Author: jinxing <[email protected]>
    
    Closes #17133 from jinxing64/SPARK-19793.

commit 40da4d181d648308de85fdcabc5c098ee861949a
Author: Liwei Lin <[email protected]>
Date:   2017-03-09T19:02:44Z

    [SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
    
    ## What changes were proposed in this pull request?
    
    Today, we compare the whole path when deciding if a file is new in the 
FileSource for structured streaming. However, this would cause false negatives 
in the case where the path has changed in a cosmetic way (i.e. changing `s3n` 
to `s3a`).
    
    This patch adds an option `fileNameOnly` that causes the new file check to 
be based only on the filename (but still store the whole path in the log).
    
    ## Usage
    
    ```scala
    spark
      .readStream
      .option("fileNameOnly", true)
      .text("s3n://bucket/dir1/dir2")
      .writeStream
      ...
    ```
    ## How was this patch tested?
    
    Added a test case
    
    Author: Liwei Lin <[email protected]>
    
    Closes #17120 from lw-lin/filename-only.

commit 30b18e69361746b4d656474374d8b486bb48a19e
Author: uncleGen <[email protected]>
Date:   2017-03-09T19:07:31Z

    [SPARK-19861][SS] watermark should not be a negative time.
    
    ## What changes were proposed in this pull request?
    
    `watermark` should not be negative. This behavior is invalid, check it 
before real run.
    
    ## How was this patch tested?
    
    add new unit test.
    
    Author: uncleGen <[email protected]>
    Author: dylon <[email protected]>
    
    Closes #17202 from uncleGen/SPARK-19861.

commit cabe1df8606e7e5b9e6efb106045deb3f39f5f13
Author: Jeff Zhang <[email protected]>
Date:   2017-03-09T19:44:34Z

    [SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc 
file in DataFrameReader.orc
    
    Beside the issue in spark api, also fix 2 minor issues in pyspark
    - support read from multiple input paths for orc
    - support read from multiple input paths for text
    
    Author: Jeff Zhang <[email protected]>
    
    Closes #10307 from zjffdu/SPARK-12334.

commit f79371ad86d94da14bd1ddb53e99a388017b6892
Author: Budde <[email protected]>
Date:   2017-03-09T20:55:33Z

    [SPARK-19611][SQL] Introduce configurable table schema inference
    
    ## Summary of changes
    
    Add a new configuration option that allows Spark SQL to infer a 
case-sensitive schema from a Hive Metastore table's data files when a 
case-sensitive schema can't be read from the table properties.
    
    - Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
    - Add schemaPreservesCase field to CatalogTable (set to false when schema 
can't
      successfully be read from Hive table props)
    - Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
      false, depending on spark.sql.hive.caseSensitiveInferenceMode
    - Add alterTableSchema() method to the ExternalCatalog interface
    - Add HiveSchemaInferenceSuite tests
    - Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
      HiveMetastoreCatalog.mergeWithMetastoreSchema
    - Move schema merging tests from ParquetSchemaSuite to 
HiveSchemaInferenceSuite
    
    [JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)
    
    ## How was this patch tested?
    
    The tests in ```HiveSchemaInferenceSuite``` should verify that schema 
inference is working as expected. ```ExternalCatalogSuite``` has also been 
extended to cover the new ```alterTableSchema()``` API.
    
    Author: Budde <[email protected]>
    
    Closes #16944 from budde/SPARK-19611.

commit 82138e09b9ad8d9609d5c64d6c11244b8f230be7
Author: Burak Yavuz <[email protected]>
Date:   2017-03-10T01:42:10Z

    [SPARK-19886] Fix reportDataLoss if statement in SS KafkaSource
    
    ## What changes were proposed in this pull request?
    
    Fix the `throw new IllegalStateException` if statement part.
    
    ## How is this patch tested
    
    Regression test
    
    Author: Burak Yavuz <[email protected]>
    
    Closes #17228 from brkyvz/kafka-cause-fix.

commit 5949e6c4477fd3cb07a6962dbee48b4416ea65dd
Author: Kazuaki Ishizaki <[email protected]>
Date:   2017-03-10T06:58:52Z

    [SPARK-19008][SQL] Improve performance of Dataset.map by eliminating 
boxing/unboxing
    
    ## What changes were proposed in this pull request?
    
    This PR improve performance of Dataset.map() for primitive types by 
removing boxing/unbox operations. This is based on [the 
discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919) 
with cloud-fan.
    
    Current Catalyst generates a method call to a `apply()` method of an 
anonymous function written in Scala. The types of an argument and return value 
are `java.lang.Object`. As a result, each method call for a primitive value 
involves a pair of unboxing and boxing for calling this `apply()` method and a 
pair of boxing and unboxing for returning from this `apply()` method.
    
    This PR directly calls a specialized version of a `apply()` method without 
boxing and unboxing. For example, if types of an arguments ant return value is 
`int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any 
combination of `Int`, `Long`, `Float`, and `Double`.
    
    The following is a benchmark result using [this 
program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a 
Dataset part of this program.
    
    Without this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
    back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    RDD                                           1923 / 1952         52.0      
    19.2       1.0X
    DataFrame                                      526 /  548        190.2      
     5.3       3.7X
    Dataset                                       3094 / 3154         32.3      
    30.9       0.6X
    ```
    
    With this PR
    ```
    OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux 
4.4.0-47-generic
    Intel(R) Xeon(R) CPU E5-2667 v3  3.20GHz
    back-to-back map:                        Best/Avg Time(ms)    Rate(M/s)   
Per Row(ns)   Relative
    
------------------------------------------------------------------------------------------------
    RDD                                           1883 / 1892         53.1      
    18.8       1.0X
    DataFrame                                      502 /  642        199.1      
     5.0       3.7X
    Dataset                                        657 /  784        152.2      
     6.6       2.9X
    ```
    
    ```java
      def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int): 
Benchmark = {
        import spark.implicits._
        val rdd = spark.sparkContext.range(0, numRows)
        val ds = spark.range(0, numRows)
        val func = (l: Long) => l + 1
        val benchmark = new Benchmark("back-to-back map", numRows)
    ...
        benchmark.addCase("Dataset") { iter =>
          var res = ds.as[Long]
          var i = 0
          while (i < numChains) {
            res = res.map(func)
            i += 1
          }
          res.queryExecution.toRdd.foreach(_ => Unit)
        }
        benchmark
      }
    ```
    
    A motivating example
    ```java
    Seq(1, 2, 3).toDS.map(i => i * 7).show
    ```
    
    Generated code without this PR
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private UnsafeRow deserializetoobject_result;
    /* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
    /* 011 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
    /* 012 */   private int mapelements_argValue;
    /* 013 */   private UnsafeRow mapelements_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
    /* 016 */   private UnsafeRow serializefromobject_result;
    /* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
    /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
    /* 019 */
    /* 020 */   public GeneratedIterator(Object[] references) {
    /* 021 */     this.references = references;
    /* 022 */   }
    /* 023 */
    /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 025 */     partitionIndex = index;
    /* 026 */     this.inputs = inputs;
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     deserializetoobject_result = new UnsafeRow(1);
    /* 029 */     this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 0);
    /* 030 */     this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
    /* 031 */
    /* 032 */     mapelements_result = new UnsafeRow(1);
    /* 033 */     this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 0);
    /* 034 */     this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
    /* 035 */     serializefromobject_result = new UnsafeRow(1);
    /* 036 */     this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 0);
    /* 037 */     this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
    /* 038 */
    /* 039 */   }
    /* 040 */
    /* 041 */   protected void processNext() throws java.io.IOException {
    /* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 043 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
    /* 045 */
    /* 046 */       boolean mapelements_isNull = true;
    /* 047 */       int mapelements_value = -1;
    /* 048 */       if (!false) {
    /* 049 */         mapelements_argValue = inputadapter_value;
    /* 050 */
    /* 051 */         mapelements_isNull = false;
    /* 052 */         if (!mapelements_isNull) {
    /* 053 */           Object mapelements_funcResult = null;
    /* 054 */           mapelements_funcResult = ((scala.Function1) 
references[0]).apply(mapelements_argValue);
    /* 055 */           if (mapelements_funcResult == null) {
    /* 056 */             mapelements_isNull = true;
    /* 057 */           } else {
    /* 058 */             mapelements_value = (Integer) mapelements_funcResult;
    /* 059 */           }
    /* 060 */
    /* 061 */         }
    /* 062 */
    /* 063 */       }
    /* 064 */
    /* 065 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 066 */
    /* 067 */       if (mapelements_isNull) {
    /* 068 */         serializefromobject_rowWriter.setNullAt(0);
    /* 069 */       } else {
    /* 070 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 071 */       }
    /* 072 */       append(serializefromobject_result);
    /* 073 */       if (shouldStop()) return;
    /* 074 */     }
    /* 075 */   }
    /* 076 */ }
    ```
    
    Generated code with this PR (lines 48-56 are changed)
    ```java
    /* 005 */ final class GeneratedIterator extends 
org.apache.spark.sql.execution.BufferedRowIterator {
    /* 006 */   private Object[] references;
    /* 007 */   private scala.collection.Iterator[] inputs;
    /* 008 */   private scala.collection.Iterator inputadapter_input;
    /* 009 */   private UnsafeRow deserializetoobject_result;
    /* 010 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
deserializetoobject_holder;
    /* 011 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
deserializetoobject_rowWriter;
    /* 012 */   private int mapelements_argValue;
    /* 013 */   private UnsafeRow mapelements_result;
    /* 014 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
mapelements_holder;
    /* 015 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
mapelements_rowWriter;
    /* 016 */   private UnsafeRow serializefromobject_result;
    /* 017 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder 
serializefromobject_holder;
    /* 018 */   private 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter 
serializefromobject_rowWriter;
    /* 019 */
    /* 020 */   public GeneratedIterator(Object[] references) {
    /* 021 */     this.references = references;
    /* 022 */   }
    /* 023 */
    /* 024 */   public void init(int index, scala.collection.Iterator[] inputs) 
{
    /* 025 */     partitionIndex = index;
    /* 026 */     this.inputs = inputs;
    /* 027 */     inputadapter_input = inputs[0];
    /* 028 */     deserializetoobject_result = new UnsafeRow(1);
    /* 029 */     this.deserializetoobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
 0);
    /* 030 */     this.deserializetoobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
 1);
    /* 031 */
    /* 032 */     mapelements_result = new UnsafeRow(1);
    /* 033 */     this.mapelements_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
 0);
    /* 034 */     this.mapelements_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
 1);
    /* 035 */     serializefromobject_result = new UnsafeRow(1);
    /* 036 */     this.serializefromobject_holder = new 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
 0);
    /* 037 */     this.serializefromobject_rowWriter = new 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
 1);
    /* 038 */
    /* 039 */   }
    /* 040 */
    /* 041 */   protected void processNext() throws java.io.IOException {
    /* 042 */     while (inputadapter_input.hasNext() && !stopEarly()) {
    /* 043 */       InternalRow inputadapter_row = (InternalRow) 
inputadapter_input.next();
    /* 044 */       int inputadapter_value = inputadapter_row.getInt(0);
    /* 045 */
    /* 046 */       boolean mapelements_isNull = true;
    /* 047 */       int mapelements_value = -1;
    /* 048 */       if (!false) {
    /* 049 */         mapelements_argValue = inputadapter_value;
    /* 050 */
    /* 051 */         mapelements_isNull = false;
    /* 052 */         if (!mapelements_isNull) {
    /* 053 */           mapelements_value = ((scala.Function1) 
references[0]).apply$mcII$sp(mapelements_argValue);
    /* 054 */         }
    /* 055 */
    /* 056 */       }
    /* 057 */
    /* 058 */       serializefromobject_rowWriter.zeroOutNullBytes();
    /* 059 */
    /* 060 */       if (mapelements_isNull) {
    /* 061 */         serializefromobject_rowWriter.setNullAt(0);
    /* 062 */       } else {
    /* 063 */         serializefromobject_rowWriter.write(0, mapelements_value);
    /* 064 */       }
    /* 065 */       append(serializefromobject_result);
    /* 066 */       if (shouldStop()) return;
    /* 067 */     }
    /* 068 */   }
    /* 069 */ }
    ```
    
    Java bytecode for methods for `i => i * 7`
    ```java
    $ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
    Compiled from "Test.scala"
    public final class 
org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends 
scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
      public static final long serialVersionUID;
    
      public final int apply(int);
        Code:
           0: aload_0
           1: iload_1
           2: invokevirtual #18                 // Method apply$mcII$sp:(I)I
           5: ireturn
    
      public int apply$mcII$sp(int);
        Code:
           0: iload_1
           1: bipush        7
           3: imul
           4: ireturn
    
      public final java.lang.Object apply(java.lang.Object);
        Code:
           0: aload_0
           1: aload_1
           2: invokestatic  #29                 // Method 
scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
           5: invokevirtual #31                 // Method apply:(I)I
           8: invokestatic  #35                 // Method 
scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
          11: areturn
    
      public 
org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
        Code:
           0: aload_0
           1: invokespecial #42                 // Method 
scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
           4: return
    }
    ```
    ## How was this patch tested?
    
    Added new test suites to `DatasetPrimitiveSuite`.
    
    Author: Kazuaki Ishizaki <[email protected]>
    
    Closes #17172 from kiszk/SPARK-19008.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to