GitHub user zhangwei72 opened a pull request:
https://github.com/apache/spark/pull/17547
SPARK-20237
## What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
## How was this patch tested?
(Please explain how this patch was tested. E.g. unit tests, integration
tests, manual tests)
(If this patch involves UI changes, please attach a screenshot; otherwise,
remove this)
Please review http://spark.apache.org/contributing.html before opening a
pull request.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/apache/spark master
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/17547.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #17547
----
commit 56e1bd337ccb03cb01702e4260e4be59d2aa0ead
Author: Asher Krim <[email protected]>
Date: 2017-03-08T04:36:46Z
[SPARK-17629][ML] methods to return synonyms directly
## What changes were proposed in this pull request?
provide methods to return synonyms directly, without wrapping them in a
dataframe
In performance sensitive applications (such as user facing apis) the
roundtrip to and from dataframes is costly and unnecessary
The methods are named ``findSynonymsArray`` to make the return type clear,
which also implies a local datastructure
## How was this patch tested?
updated word2vec tests
Author: Asher Krim <[email protected]>
Closes #16811 from Krimit/w2vFindSynonymsLocal.
commit 314e48a3584bad4b486b046bbf0159d64ba857bc
Author: Michael Armbrust <[email protected]>
Date: 2017-03-08T09:32:42Z
[SPARK-18055][SQL] Use correct mirror in ExpresionEncoder
Previously, we were using the mirror of passed in `TypeTag` when reflecting
to build an encoder. This fails when the outer class is built in (i.e. `Seq`'s
default mirror is based on root classloader) but inner classes (i.e. `A` in
`Seq[A]`) are defined in the REPL or a library.
This patch changes us to always reflect based on a mirror created using the
context classloader.
Author: Michael Armbrust <[email protected]>
Closes #17201 from marmbrus/replSeqEncoder.
commit 1fa58868bc6635ff2119264665bd3d00b4b1253a
Author: Yanbo Liang <[email protected]>
Date: 2017-03-08T10:05:01Z
[ML][MINOR] Separate estimator and model params for read/write test.
## What changes were proposed in this pull request?
Since we allow ```Estimator``` and ```Model``` not always share same params
(see ```ALSParams``` and ```ALSModelParams```), we should pass in test params
for estimator and model separately in function
```testEstimatorAndModelReadWrite```.
## How was this patch tested?
Existing tests.
Author: Yanbo Liang <[email protected]>
Closes #17151 from yanboliang/test-rw.
commit 81303f7ca7808d51229411dce8feeed8c23dbe15
Author: Yanbo Liang <[email protected]>
Date: 2017-03-08T10:09:36Z
[SPARK-19806][ML][PYSPARK] PySpark GeneralizedLinearRegression supports
tweedie distribution.
## What changes were proposed in this pull request?
PySpark ```GeneralizedLinearRegression``` supports tweedie distribution.
## How was this patch tested?
Add unit tests.
Author: Yanbo Liang <[email protected]>
Closes #17146 from yanboliang/spark-19806.
commit 3f9f9180c2e695ad468eb813df5feec41e169531
Author: Yuming Wang <[email protected]>
Date: 2017-03-08T11:31:01Z
[SPARK-19693][SQL] Make the SET mapreduce.job.reduces automatically
converted to spark.sql.shuffle.partitions
## What changes were proposed in this pull request?
Make the `SET mapreduce.job.reduces` automatically converted to
`spark.sql.shuffle.partitions`, it's similar to `SET mapred.reduce.tasks`.
## How was this patch tested?
unit tests
Author: Yuming Wang <[email protected]>
Closes #17020 from wangyum/SPARK-19693.
commit 9ea201cf6482c9c62c9428759d238063db62d66e
Author: Anthony Truchet <[email protected]>
Date: 2017-03-08T11:44:25Z
[SPARK-16440][MLLIB] Ensure broadcasted variables are destroyed even in
case of exception
## What changes were proposed in this pull request?
Ensure broadcasted variable are destroyed even in case of exception
## How was this patch tested?
Word2VecSuite was run locally
Author: Anthony Truchet <[email protected]>
Closes #14299 from AnthonyTruchet/SPARK-16440.
commit e44274870dee308f4e3e8ce79457d8d19693b6e5
Author: wangzhenhua <[email protected]>
Date: 2017-03-08T15:01:28Z
[SPARK-17080][SQL] join reorder
## What changes were proposed in this pull request?
Reorder the joins using a dynamic programming algorithm (Selinger paper):
First we put all items (basic joined nodes) into level 1, then we build all
two-way joins at level 2 from plans at level 1 (single items), then build all
3-way joins from plans at previous levels (two-way joins and single items),
then 4-way joins ... etc, until we build all n-way joins and pick the best plan
among them.
When building m-way joins, we only keep the best plan (with the lowest
cost) for the same set of m items. E.g., for 3-way joins, we keep only the best
plan for items {A, B, C} among plans (A J B) J C, (A J C) J B and (B J C) J A.
Thus, the plans maintained for each level when reordering four items A, B, C, D
are as follows:
```
level 1: p({A}), p({B}), p({C}), p({D})
level 2: p({A, B}), p({A, C}), p({A, D}), p({B, C}), p({B, D}), p({C, D})
level 3: p({A, B, C}), p({A, B, D}), p({A, C, D}), p({B, C, D})
level 4: p({A, B, C, D})
```
where p({A, B, C, D}) is the final output plan.
For cost evaluation, since physical costs for operators are not available
currently, we use cardinalities and sizes to compute costs.
## How was this patch tested?
add test cases
Author: wangzhenhua <[email protected]>
Author: Zhenhua Wang <[email protected]>
Closes #17138 from wzhfy/joinReorder.
commit 5f7d835d380c1a558a4a6d8366140cd96ee202eb
Author: jiangxingbo <[email protected]>
Date: 2017-03-08T15:18:17Z
[SPARK-19865][SQL] remove the view identifier in SubqueryAlias
## What changes were proposed in this pull request?
Since we have a `View` node now, we can remove the view identifier in
`SubqueryAlias`, which was used to indicate a view node before.
## How was this patch tested?
Update the related test cases.
Author: jiangxingbo <[email protected]>
Closes #17210 from jiangxb1987/SubqueryAlias.
commit 9a6ac7226fd09d570cae08d0daea82d9bca189a0
Author: Xiao Li <[email protected]>
Date: 2017-03-08T17:36:01Z
[SPARK-19601][SQL] Fix CollapseRepartition rule to preserve shuffle-enabled
Repartition
### What changes were proposed in this pull request?
Observed by felixcheung in https://github.com/apache/spark/pull/16739,
when users use the shuffle-enabled `repartition` API, they expect the partition
they got should be the exact number they provided, even if they call
shuffle-disabled `coalesce` later.
Currently, `CollapseRepartition` rule does not consider whether shuffle is
enabled or not. Thus, we got the following unexpected result.
```Scala
val df = spark.range(0, 10000, 1, 5)
val df2 = df.repartition(10)
assert(df2.coalesce(13).rdd.getNumPartitions == 5)
assert(df2.coalesce(7).rdd.getNumPartitions == 5)
assert(df2.coalesce(3).rdd.getNumPartitions == 3)
```
This PR is to fix the issue. We preserve shuffle-enabled Repartition.
### How was this patch tested?
Added a test case
Author: Xiao Li <[email protected]>
Closes #16933 from gatorsmile/CollapseRepartition.
commit e420fd4592615d91cdcbca674ac58bcca6ab2ff3
Author: Tejas Patil <[email protected]>
Date: 2017-03-08T17:38:05Z
[SPARK-19843][SQL][FOLLOWUP] Classdoc for `IntWrapper` and `LongWrapper`
## What changes were proposed in this pull request?
This is as per suggestion by rxin at :
https://github.com/apache/spark/pull/17184#discussion_r104841735
## How was this patch tested?
NA as this is a documentation change
Author: Tejas Patil <[email protected]>
Closes #17205 from tejasapatil/SPARK-19843_followup.
commit f3387d97487cbef894b6963bc008f6a5c4294a85
Author: windpiger <[email protected]>
Date: 2017-03-08T18:48:53Z
[SPARK-19864][SQL][TEST] provide a makeQualifiedPath functions to optimize
some code
## What changes were proposed in this pull request?
Currently there are lots of places to make the path qualified, it is better
to provide a function to do this, then the code will be more simple.
## How was this patch tested?
N/A
Author: windpiger <[email protected]>
Closes #17204 from windpiger/addQualifiledPathUtil.
commit e9e2c612d58a19ddcb4b6abfb7389a4b0f7ef6f8
Author: Wojtek Szymanski <[email protected]>
Date: 2017-03-08T20:36:16Z
[SPARK-19727][SQL] Fix for round function that modifies original column
## What changes were proposed in this pull request?
Fix for SQL round function that modifies original column when underlying
data frame is created from a local product.
import org.apache.spark.sql.functions._
case class NumericRow(value: BigDecimal)
val df =
spark.createDataFrame(Seq(NumericRow(BigDecimal("1.23456789"))))
df.show()
+--------------------+
| value|
+--------------------+
|1.234567890000000000|
+--------------------+
df.withColumn("value_rounded", round('value)).show()
// before
+--------------------+-------------+
| value|value_rounded|
+--------------------+-------------+
|1.000000000000000000| 1|
+--------------------+-------------+
// after
+--------------------+-------------+
| value|value_rounded|
+--------------------+-------------+
|1.234567890000000000| 1|
+--------------------+-------------+
## How was this patch tested?
New unit test added to existing suite
`org.apache.spark.sql.MathFunctionsSuite`
Author: Wojtek Szymanski <[email protected]>
Closes #17075 from wojtek-szymanski/SPARK-19727.
commit 1bf9012380de2aa7bdf39220b55748defde8b700
Author: Shixiong Zhu <[email protected]>
Date: 2017-03-08T21:18:07Z
[SPARK-19858][SS] Add output mode to flatMapGroupsWithState and disallow
invalid cases
## What changes were proposed in this pull request?
Add a output mode parameter to `flatMapGroupsWithState` and just define
`mapGroupsWithState` as `flatMapGroupsWithState(Update)`.
`UnsupportedOperationChecker` is modified to disallow unsupported cases.
- Batch mapGroupsWithState or flatMapGroupsWithState is always allowed.
- For streaming (map/flatMap)GroupsWithState, see the following table:
| Operators | Supported Query Output Mode |
| ------------- | ------------- |
| flatMapGroupsWithState(Update) without aggregation | Update |
| flatMapGroupsWithState(Update) with aggregation | None |
| flatMapGroupsWithState(Append) without aggregation | Append |
| flatMapGroupsWithState(Append) before aggregation | Append, Update,
Complete |
| flatMapGroupsWithState(Append) after aggregation | None |
| Multiple flatMapGroupsWithState(Append)s | Append |
| Multiple mapGroupsWithStates | None |
| Mxing mapGroupsWithStates and flatMapGroupsWithStates | None |
| Other cases of multiple flatMapGroupsWithState | None |
## How was this patch tested?
The added unit tests. Here are the tests related to
(map/flatMap)GroupsWithState:
```
[info] - batch plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on batch relation: supported (1 millisecond)
[info] - batch plan - flatMapGroupsWithState - multiple
flatMapGroupsWithState(Append)s on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on batch relation: supported (0 milliseconds)
[info] - batch plan - flatMapGroupsWithState - multiple
flatMapGroupsWithState(Update)s on batch relation: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation without aggregation in
update mode: supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation without aggregation in
append mode: not supported (7 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation without aggregation in
complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation with aggregation in Append
mode: not supported (11 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation with aggregation in Update
mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation with aggregation in
Complete mode: not supported (5 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation without aggregation in
append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation without aggregation in
update mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation before aggregation in
Append mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation before aggregation in
Update mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation before aggregation in
Complete mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation after aggregation in
Append mode: not supported (6 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on streaming relation after aggregation in
Update mode: not supported (4 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on streaming relation in complete mode: not
supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on batch relation inside streaming relation in
Append output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Append) on batch relation inside streaming relation in
Update output mode: supported (1 millisecond)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on batch relation inside streaming relation in
Append output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState -
flatMapGroupsWithState(Update) on batch relation inside streaming relation in
Update output mode: supported (0 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - multiple
flatMapGroupsWithStates on streaming relation and all are in append mode:
supported (2 milliseconds)
[info] - streaming plan - flatMapGroupsWithState - multiple
flatMapGroupsWithStates on s streaming relation but some are not in append
mode: not supported (7 milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on
streaming relation without aggregation in append mode: not supported (3
milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on
streaming relation without aggregation in complete mode: not supported (3
milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on
streaming relation with aggregation in Append mode: not supported (6
milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on
streaming relation with aggregation in Update mode: not supported (3
milliseconds)
[info] - streaming plan - mapGroupsWithState - mapGroupsWithState on
streaming relation with aggregation in Complete mode: not supported (4
milliseconds)
[info] - streaming plan - mapGroupsWithState - multiple mapGroupsWithStates
on streaming relation and all are in append mode: not supported (4 milliseconds)
[info] - streaming plan - mapGroupsWithState - mixing mapGroupsWithStates
and flatMapGroupsWithStates on streaming relation: not supported (4
milliseconds)
```
Author: Shixiong Zhu <[email protected]>
Closes #17197 from zsxwing/mapgroups-check.
commit 6570cfd7abe349dc6d2151f2ac9dc662e7465a79
Author: Kunal Khamar <[email protected]>
Date: 2017-03-08T21:06:22Z
[SPARK-19540][SQL] Add ability to clone SparkSession wherein cloned session
has an identical copy of the SessionState
Forking a newSession() from SparkSession currently makes a new SparkSession
that does not retain SessionState (i.e. temporary tables, SQL config,
registered functions etc.) This change adds a method cloneSession() which
creates a new SparkSession with a copy of the parent's SessionState.
Subsequent changes to base session are not propagated to cloned session,
clone is independent after creation.
If the base is changed after clone has been created, say user registers new
UDF, then the new UDF will not be available inside the clone. Same goes for
configs and temp tables.
Unit tests
Author: Kunal Khamar <[email protected]>
Author: Shixiong Zhu <[email protected]>
Closes #16826 from kunalkhamar/fork-sparksession.
commit 455129020ca7f6a162f6f2486a87cc43512cfd2c
Author: hyukjinkwon <[email protected]>
Date: 2017-03-08T21:43:09Z
[SPARK-15463][SQL] Add an API to load DataFrame from Dataset[String]
storing CSV
## What changes were proposed in this pull request?
This PR proposes to add an API that loads `DataFrame` from
`Dataset[String]` storing csv.
It allows pre-processing before loading into CSV, which means allowing a
lot of workarounds for many narrow cases, for example, as below:
- Case 1 - pre-processing
```scala
val df = spark.read.text("...")
// Pre-processing with this.
spark.read.csv(df.as[String])
```
- Case 2 - use other input formats
```scala
val rdd = spark.sparkContext.newAPIHadoopFile("/file.csv.lzo",
classOf[com.hadoop.mapreduce.LzoTextInputFormat],
classOf[org.apache.hadoop.io.LongWritable],
classOf[org.apache.hadoop.io.Text])
val stringRdd = rdd.map(pair => new String(pair._2.getBytes, 0,
pair._2.getLength))
spark.read.csv(stringRdd.toDS)
```
## How was this patch tested?
Added tests in `CSVSuite` and build with Scala 2.10.
```
./dev/change-scala-version.sh 2.10
./build/mvn -Pyarn -Phadoop-2.4 -Dscala-2.10 -DskipTests clean package
```
Author: hyukjinkwon <[email protected]>
Closes #16854 from HyukjinKwon/SPARK-15463.
commit a3648b5d4f99ff9461d02f53e9ec71787a3abf51
Author: Burak Yavuz <[email protected]>
Date: 2017-03-08T22:35:07Z
[SPARK-19813] maxFilesPerTrigger combo latestFirst may miss old files in
combination with maxFileAge in FileStreamSource
## What changes were proposed in this pull request?
**The Problem**
There is a file stream source option called maxFileAge which limits how old
the files can be, relative the latest file that has been seen. This is used to
limit the files that need to be remembered as "processed". Files older than the
latest processed files are ignored. This values is by default 7 days.
This causes a problem when both
latestFirst = true
maxFilesPerTrigger > total files to be processed.
Here is what happens in all combinations
1) latestFirst = false - Since files are processed in order, there wont be
any unprocessed file older than the latest processed file. All files will be
processed.
2) latestFirst = true AND maxFilesPerTrigger is not set - The maxFileAge
thresholding mechanism takes one batch initialize. If maxFilesPerTrigger is
not, then all old files get processed in the first batch, and so no file is
left behind.
3) latestFirst = true AND maxFilesPerTrigger is set to X - The first batch
process the latest X files. That sets the threshold latest file - maxFileAge,
so files older than this threshold will never be considered for processing.
The bug is with case 3.
**The Solution**
Ignore `maxFileAge` when both `maxFilesPerTrigger` and `latestFirst` are
set.
## How was this patch tested?
Regression test in `FileStreamSourceSuite`
Author: Burak Yavuz <[email protected]>
Closes #17153 from brkyvz/maxFileAge.
commit d809ceed9762d5bbb04170e45f38751713112dd8
Author: Dilip Biswal <[email protected]>
Date: 2017-03-09T01:33:49Z
[MINOR][SQL] The analyzer rules are fired twice for cases when
AnalysisException is raised from analyzer.
## What changes were proposed in this pull request?
In general we have a checkAnalysis phase which validates the logical plan
and throws AnalysisException on semantic errors. However we also can throw
AnalysisException from a few analyzer rules like ResolveSubquery.
I found that we fire up the analyzer rules twice for the queries that throw
AnalysisException from one of the analyzer rules. This is a very minor fix. We
don't have to strictly fix it. I just got confused seeing the rule getting
fired two times when i was not expecting it.
## How was this patch tested?
Tested manually.
Author: Dilip Biswal <[email protected]>
Closes #17214 from dilipbiswal/analyis_twice.
commit 09829be621f0f9bb5076abb3d832925624699fa9
Author: Xiao Li <[email protected]>
Date: 2017-03-09T07:12:10Z
[SPARK-19235][SQL][TESTS] Enable Test Cases in DDLSuite with Hive Metastore
### What changes were proposed in this pull request?
So far, the test cases in DDLSuites only verify the behaviors of
InMemoryCatalog. That means, they do not cover the scenarios using
HiveExternalCatalog. Thus, we need to improve the existing test suite to run
these cases using Hive metastore.
When porting these test cases, a bug of `SET LOCATION` is found. `path` is
not set when the location is changed.
After this PR, a few changes are made, as summarized below,
- `DDLSuite` becomes an abstract class. Both `InMemoryCatalogedDDLSuite`
and `HiveCatalogedDDLSuite` extend it. `InMemoryCatalogedDDLSuite` is using
`InMemoryCatalog`. `HiveCatalogedDDLSuite` is using `HiveExternalCatalog`.
- `InMemoryCatalogedDDLSuite` contains all the existing test cases in
`DDLSuite`.
- `HiveCatalogedDDLSuite` contains a subset of `DDLSuite`. The following
test cases are excluded:
1. The following test cases only make sense for `InMemoryCatalog`:
```
test("desc table for parquet data source table using in-memory catalog")
test("create a managed Hive source table") {
test("create an external Hive source table")
test("Create Hive Table As Select")
```
2. The following test cases are unable to be ported because we are unable
to alter table provider when using Hive metastore. In the future PRs we need to
improve the test cases so that altering table provider is not needed:
```
test("alter table: set location (datasource table)")
test("alter table: set properties (datasource table)")
test("alter table: unset properties (datasource table)")
test("alter table: set serde (datasource table)")
test("alter table: set serde partition (datasource table)")
test("alter table: change column (datasource table)")
test("alter table: add partition (datasource table)")
test("alter table: drop partition (datasource table)")
test("alter table: rename partition (datasource table)")
test("drop table - data source table")
```
**TODO** : in the future PRs, we need to remove `HiveDDLSuite` and move the
test cases to either `DDLSuite`, `InMemoryCatalogedDDLSuite` or
`HiveCatalogedDDLSuite`.
### How was this patch tested?
N/A
Author: Xiao Li <[email protected]>
Author: gatorsmile <[email protected]>
Closes #16592 from gatorsmile/refactorDDLSuite.
commit 029e40b412e332c9f0fff283d604e203066c78c0
Author: Shixiong Zhu <[email protected]>
Date: 2017-03-09T07:15:52Z
[SPARK-19874][BUILD] Hide API docs for org.apache.spark.sql.internal
## What changes were proposed in this pull request?
The API docs should not include the "org.apache.spark.sql.internal" package
because they are internal private APIs.
## How was this patch tested?
Jenkins
Author: Shixiong Zhu <[email protected]>
Closes #17217 from zsxwing/SPARK-19874.
commit eeb1d6db878641d9eac62d0869a90fe80c1f4461
Author: uncleGen <[email protected]>
Date: 2017-03-09T07:23:10Z
[SPARK-19859][SS][FOLLOW-UP] The new watermark should override the old one.
## What changes were proposed in this pull request?
A follow up to SPARK-19859:
- extract the calculation of `delayMs` and reuse it.
- update EventTimeWatermarkExec
- use the correct `delayMs` in EventTimeWatermark
## How was this patch tested?
Jenkins.
Author: uncleGen <[email protected]>
Closes #17221 from uncleGen/SPARK-19859.
commit 274973d2a32ff4eb28545b50a3135e4784eb3047
Author: windpiger <[email protected]>
Date: 2017-03-09T09:18:17Z
[SPARK-19763][SQL] qualified external datasource table location stored in
catalog
## What changes were proposed in this pull request?
If we create a external datasource table with a non-qualified location , we
should qualified it to store in catalog.
```
CREATE TABLE t(a string)
USING parquet
LOCATION '/path/xx'
CREATE TABLE t1(a string, b string)
USING parquet
PARTITIONED BY(b)
LOCATION '/path/xx'
```
when we get the table from catalog, the location should be qualified,
e.g.'file:/path/xxx'
## How was this patch tested?
unit test added
Author: windpiger <[email protected]>
Closes #17095 from windpiger/tablepathQualified.
commit 206030bd12405623c00c1ff334663984b9250adb
Author: Jason White <[email protected]>
Date: 2017-03-09T18:34:54Z
[SPARK-19561][SQL] add int case handling for TimestampType
## What changes were proposed in this pull request?
Add handling of input of type `Int` for dataType `TimestampType` to
`EvaluatePython.scala`. Py4J serializes ints smaller than MIN_INT or larger
than MAX_INT to Long, which are handled correctly already, but values between
MIN_INT and MAX_INT are serialized to Int.
These range limits correspond to roughly half an hour on either side of the
epoch. As a result, PySpark doesn't allow TimestampType values to be created in
this range.
Alternatives attempted: patching the `TimestampType.toInternal` function to
cast return values to `long`, so Py4J would always serialize them to Scala
Long. Python3 does not have a `long` type, so this approach failed on Python3.
## How was this patch tested?
Added a new PySpark-side test that fails without the change.
The contribution is my original work and I license the work to the project
under the projectâs open source license.
Resubmission of https://github.com/apache/spark/pull/16896. The original PR
didn't go through Jenkins and broke the build. davies dongjoon-hyun
cloud-fan Could you kick off a Jenkins run for me? It passed everything for
me locally, but it's possible something has changed in the last few weeks.
Author: Jason White <[email protected]>
Closes #17200 from JasonMWhite/SPARK-19561.
commit b60b9fc10a1ee52c3c021a4a5faf10f92f83e3c9
Author: Jimmy Xiang <[email protected]>
Date: 2017-03-09T18:52:18Z
[SPARK-19757][CORE] DriverEndpoint#makeOffers race against
CoarseGrainedSchedulerBackend#killExecutors
## What changes were proposed in this pull request?
While some executors are being killed due to idleness, if some new tasks
come in, driver could assign them to some executors are being killed. These
tasks will fail later when the executors are lost. This patch is to make sure
CoarseGrainedSchedulerBackend#killExecutors and DriverEndpoint#makeOffers are
properly synchronized.
## How was this patch tested?
manual tests
Author: Jimmy Xiang <[email protected]>
Closes #17091 from jxiang/spark-19757.
commit 3232e54f2fcb8d2072cba4bc763ef29d5d8d325f
Author: jinxing <[email protected]>
Date: 2017-03-09T18:56:19Z
[SPARK-19793] Use clock.getTimeMillis when mark task as finished in
TaskSetManager.
## What changes were proposed in this pull request?
TaskSetManager is now using `System.getCurrentTimeMillis` when mark task as
finished in `handleSuccessfulTask` and `handleFailedTask`. Thus developer
cannot set the tasks finishing time in unit test. When `handleSuccessfulTask`,
task's duration = `System.getCurrentTimeMillis` - launchTime(which can be set
by `clock`), the result is not correct.
## How was this patch tested?
Existing tests.
Author: jinxing <[email protected]>
Closes #17133 from jinxing64/SPARK-19793.
commit 40da4d181d648308de85fdcabc5c098ee861949a
Author: Liwei Lin <[email protected]>
Date: 2017-03-09T19:02:44Z
[SPARK-19715][STRUCTURED STREAMING] Option to Strip Paths in FileSource
## What changes were proposed in this pull request?
Today, we compare the whole path when deciding if a file is new in the
FileSource for structured streaming. However, this would cause false negatives
in the case where the path has changed in a cosmetic way (i.e. changing `s3n`
to `s3a`).
This patch adds an option `fileNameOnly` that causes the new file check to
be based only on the filename (but still store the whole path in the log).
## Usage
```scala
spark
.readStream
.option("fileNameOnly", true)
.text("s3n://bucket/dir1/dir2")
.writeStream
...
```
## How was this patch tested?
Added a test case
Author: Liwei Lin <[email protected]>
Closes #17120 from lw-lin/filename-only.
commit 30b18e69361746b4d656474374d8b486bb48a19e
Author: uncleGen <[email protected]>
Date: 2017-03-09T19:07:31Z
[SPARK-19861][SS] watermark should not be a negative time.
## What changes were proposed in this pull request?
`watermark` should not be negative. This behavior is invalid, check it
before real run.
## How was this patch tested?
add new unit test.
Author: uncleGen <[email protected]>
Author: dylon <[email protected]>
Closes #17202 from uncleGen/SPARK-19861.
commit cabe1df8606e7e5b9e6efb106045deb3f39f5f13
Author: Jeff Zhang <[email protected]>
Date: 2017-03-09T19:44:34Z
[SPARK-12334][SQL][PYSPARK] Support read from multiple input paths for orc
file in DataFrameReader.orc
Beside the issue in spark api, also fix 2 minor issues in pyspark
- support read from multiple input paths for orc
- support read from multiple input paths for text
Author: Jeff Zhang <[email protected]>
Closes #10307 from zjffdu/SPARK-12334.
commit f79371ad86d94da14bd1ddb53e99a388017b6892
Author: Budde <[email protected]>
Date: 2017-03-09T20:55:33Z
[SPARK-19611][SQL] Introduce configurable table schema inference
## Summary of changes
Add a new configuration option that allows Spark SQL to infer a
case-sensitive schema from a Hive Metastore table's data files when a
case-sensitive schema can't be read from the table properties.
- Add spark.sql.hive.caseSensitiveInferenceMode param to SQLConf
- Add schemaPreservesCase field to CatalogTable (set to false when schema
can't
successfully be read from Hive table props)
- Perform schema inference in HiveMetastoreCatalog if schemaPreservesCase is
false, depending on spark.sql.hive.caseSensitiveInferenceMode
- Add alterTableSchema() method to the ExternalCatalog interface
- Add HiveSchemaInferenceSuite tests
- Refactor and move ParquetFileForamt.meregeMetastoreParquetSchema() as
HiveMetastoreCatalog.mergeWithMetastoreSchema
- Move schema merging tests from ParquetSchemaSuite to
HiveSchemaInferenceSuite
[JIRA for this change](https://issues.apache.org/jira/browse/SPARK-19611)
## How was this patch tested?
The tests in ```HiveSchemaInferenceSuite``` should verify that schema
inference is working as expected. ```ExternalCatalogSuite``` has also been
extended to cover the new ```alterTableSchema()``` API.
Author: Budde <[email protected]>
Closes #16944 from budde/SPARK-19611.
commit 82138e09b9ad8d9609d5c64d6c11244b8f230be7
Author: Burak Yavuz <[email protected]>
Date: 2017-03-10T01:42:10Z
[SPARK-19886] Fix reportDataLoss if statement in SS KafkaSource
## What changes were proposed in this pull request?
Fix the `throw new IllegalStateException` if statement part.
## How is this patch tested
Regression test
Author: Burak Yavuz <[email protected]>
Closes #17228 from brkyvz/kafka-cause-fix.
commit 5949e6c4477fd3cb07a6962dbee48b4416ea65dd
Author: Kazuaki Ishizaki <[email protected]>
Date: 2017-03-10T06:58:52Z
[SPARK-19008][SQL] Improve performance of Dataset.map by eliminating
boxing/unboxing
## What changes were proposed in this pull request?
This PR improve performance of Dataset.map() for primitive types by
removing boxing/unbox operations. This is based on [the
discussion](https://github.com/apache/spark/pull/16391#discussion_r93788919)
with cloud-fan.
Current Catalyst generates a method call to a `apply()` method of an
anonymous function written in Scala. The types of an argument and return value
are `java.lang.Object`. As a result, each method call for a primitive value
involves a pair of unboxing and boxing for calling this `apply()` method and a
pair of boxing and unboxing for returning from this `apply()` method.
This PR directly calls a specialized version of a `apply()` method without
boxing and unboxing. For example, if types of an arguments ant return value is
`int`, this PR generates a method call to `apply$mcII$sp`. This PR supports any
combination of `Int`, `Long`, `Float`, and `Double`.
The following is a benchmark result using [this
program](https://github.com/apache/spark/pull/16391/files) with 4.7x. Here is a
Dataset part of this program.
Without this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
back-to-back map: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 1923 / 1952 52.0
19.2 1.0X
DataFrame 526 / 548 190.2
5.3 3.7X
Dataset 3094 / 3154 32.3
30.9 0.6X
```
With this PR
```
OpenJDK 64-Bit Server VM 1.8.0_111-8u111-b14-2ubuntu0.16.04.2-b14 on Linux
4.4.0-47-generic
Intel(R) Xeon(R) CPU E5-2667 v3 3.20GHz
back-to-back map: Best/Avg Time(ms) Rate(M/s)
Per Row(ns) Relative
------------------------------------------------------------------------------------------------
RDD 1883 / 1892 53.1
18.8 1.0X
DataFrame 502 / 642 199.1
5.0 3.7X
Dataset 657 / 784 152.2
6.6 2.9X
```
```java
def backToBackMap(spark: SparkSession, numRows: Long, numChains: Int):
Benchmark = {
import spark.implicits._
val rdd = spark.sparkContext.range(0, numRows)
val ds = spark.range(0, numRows)
val func = (l: Long) => l + 1
val benchmark = new Benchmark("back-to-back map", numRows)
...
benchmark.addCase("Dataset") { iter =>
var res = ds.as[Long]
var i = 0
while (i < numChains) {
res = res.map(func)
i += 1
}
res.queryExecution.toRdd.foreach(_ => Unit)
}
benchmark
}
```
A motivating example
```java
Seq(1, 2, 3).toDS.map(i => i * 7).show
```
Generated code without this PR
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow deserializetoobject_result;
/* 010 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
deserializetoobject_holder;
/* 011 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
deserializetoobject_rowWriter;
/* 012 */ private int mapelements_argValue;
/* 013 */ private UnsafeRow mapelements_result;
/* 014 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
mapelements_holder;
/* 015 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
mapelements_rowWriter;
/* 016 */ private UnsafeRow serializefromobject_result;
/* 017 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 018 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 019 */
/* 020 */ public GeneratedIterator(Object[] references) {
/* 021 */ this.references = references;
/* 022 */ }
/* 023 */
/* 024 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 025 */ partitionIndex = index;
/* 026 */ this.inputs = inputs;
/* 027 */ inputadapter_input = inputs[0];
/* 028 */ deserializetoobject_result = new UnsafeRow(1);
/* 029 */ this.deserializetoobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
0);
/* 030 */ this.deserializetoobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
1);
/* 031 */
/* 032 */ mapelements_result = new UnsafeRow(1);
/* 033 */ this.mapelements_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
0);
/* 034 */ this.mapelements_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
1);
/* 035 */ serializefromobject_result = new UnsafeRow(1);
/* 036 */ this.serializefromobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
0);
/* 037 */ this.serializefromobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
1);
/* 038 */
/* 039 */ }
/* 040 */
/* 041 */ protected void processNext() throws java.io.IOException {
/* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 044 */ int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */ boolean mapelements_isNull = true;
/* 047 */ int mapelements_value = -1;
/* 048 */ if (!false) {
/* 049 */ mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */ mapelements_isNull = false;
/* 052 */ if (!mapelements_isNull) {
/* 053 */ Object mapelements_funcResult = null;
/* 054 */ mapelements_funcResult = ((scala.Function1)
references[0]).apply(mapelements_argValue);
/* 055 */ if (mapelements_funcResult == null) {
/* 056 */ mapelements_isNull = true;
/* 057 */ } else {
/* 058 */ mapelements_value = (Integer) mapelements_funcResult;
/* 059 */ }
/* 060 */
/* 061 */ }
/* 062 */
/* 063 */ }
/* 064 */
/* 065 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 066 */
/* 067 */ if (mapelements_isNull) {
/* 068 */ serializefromobject_rowWriter.setNullAt(0);
/* 069 */ } else {
/* 070 */ serializefromobject_rowWriter.write(0, mapelements_value);
/* 071 */ }
/* 072 */ append(serializefromobject_result);
/* 073 */ if (shouldStop()) return;
/* 074 */ }
/* 075 */ }
/* 076 */ }
```
Generated code with this PR (lines 48-56 are changed)
```java
/* 005 */ final class GeneratedIterator extends
org.apache.spark.sql.execution.BufferedRowIterator {
/* 006 */ private Object[] references;
/* 007 */ private scala.collection.Iterator[] inputs;
/* 008 */ private scala.collection.Iterator inputadapter_input;
/* 009 */ private UnsafeRow deserializetoobject_result;
/* 010 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
deserializetoobject_holder;
/* 011 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
deserializetoobject_rowWriter;
/* 012 */ private int mapelements_argValue;
/* 013 */ private UnsafeRow mapelements_result;
/* 014 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
mapelements_holder;
/* 015 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
mapelements_rowWriter;
/* 016 */ private UnsafeRow serializefromobject_result;
/* 017 */ private
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder
serializefromobject_holder;
/* 018 */ private
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter
serializefromobject_rowWriter;
/* 019 */
/* 020 */ public GeneratedIterator(Object[] references) {
/* 021 */ this.references = references;
/* 022 */ }
/* 023 */
/* 024 */ public void init(int index, scala.collection.Iterator[] inputs)
{
/* 025 */ partitionIndex = index;
/* 026 */ this.inputs = inputs;
/* 027 */ inputadapter_input = inputs[0];
/* 028 */ deserializetoobject_result = new UnsafeRow(1);
/* 029 */ this.deserializetoobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(deserializetoobject_result,
0);
/* 030 */ this.deserializetoobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(deserializetoobject_holder,
1);
/* 031 */
/* 032 */ mapelements_result = new UnsafeRow(1);
/* 033 */ this.mapelements_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(mapelements_result,
0);
/* 034 */ this.mapelements_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(mapelements_holder,
1);
/* 035 */ serializefromobject_result = new UnsafeRow(1);
/* 036 */ this.serializefromobject_holder = new
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(serializefromobject_result,
0);
/* 037 */ this.serializefromobject_rowWriter = new
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(serializefromobject_holder,
1);
/* 038 */
/* 039 */ }
/* 040 */
/* 041 */ protected void processNext() throws java.io.IOException {
/* 042 */ while (inputadapter_input.hasNext() && !stopEarly()) {
/* 043 */ InternalRow inputadapter_row = (InternalRow)
inputadapter_input.next();
/* 044 */ int inputadapter_value = inputadapter_row.getInt(0);
/* 045 */
/* 046 */ boolean mapelements_isNull = true;
/* 047 */ int mapelements_value = -1;
/* 048 */ if (!false) {
/* 049 */ mapelements_argValue = inputadapter_value;
/* 050 */
/* 051 */ mapelements_isNull = false;
/* 052 */ if (!mapelements_isNull) {
/* 053 */ mapelements_value = ((scala.Function1)
references[0]).apply$mcII$sp(mapelements_argValue);
/* 054 */ }
/* 055 */
/* 056 */ }
/* 057 */
/* 058 */ serializefromobject_rowWriter.zeroOutNullBytes();
/* 059 */
/* 060 */ if (mapelements_isNull) {
/* 061 */ serializefromobject_rowWriter.setNullAt(0);
/* 062 */ } else {
/* 063 */ serializefromobject_rowWriter.write(0, mapelements_value);
/* 064 */ }
/* 065 */ append(serializefromobject_result);
/* 066 */ if (shouldStop()) return;
/* 067 */ }
/* 068 */ }
/* 069 */ }
```
Java bytecode for methods for `i => i * 7`
```java
$ javap -c Test\$\$anonfun\$5\$\$anonfun\$apply\$mcV\$sp\$1.class
Compiled from "Test.scala"
public final class
org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1 extends
scala.runtime.AbstractFunction1$mcII$sp implements scala.Serializable {
public static final long serialVersionUID;
public final int apply(int);
Code:
0: aload_0
1: iload_1
2: invokevirtual #18 // Method apply$mcII$sp:(I)I
5: ireturn
public int apply$mcII$sp(int);
Code:
0: iload_1
1: bipush 7
3: imul
4: ireturn
public final java.lang.Object apply(java.lang.Object);
Code:
0: aload_0
1: aload_1
2: invokestatic #29 // Method
scala/runtime/BoxesRunTime.unboxToInt:(Ljava/lang/Object;)I
5: invokevirtual #31 // Method apply:(I)I
8: invokestatic #35 // Method
scala/runtime/BoxesRunTime.boxToInteger:(I)Ljava/lang/Integer;
11: areturn
public
org.apache.spark.sql.Test$$anonfun$5$$anonfun$apply$mcV$sp$1(org.apache.spark.sql.Test$$anonfun$5);
Code:
0: aload_0
1: invokespecial #42 // Method
scala/runtime/AbstractFunction1$mcII$sp."<init>":()V
4: return
}
```
## How was this patch tested?
Added new test suites to `DatasetPrimitiveSuite`.
Author: Kazuaki Ishizaki <[email protected]>
Closes #17172 from kiszk/SPARK-19008.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]