svn commit: r24090 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_22_01-fd46a27-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Tue Jan  9 06:15:41 2018
New Revision: 24090

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_22_01-fd46a27 docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21293][SPARKR][DOCS] structured streaming doc update

2018-01-08 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 911a4dbe7 -> a23c07ecb


[SPARK-21293][SPARKR][DOCS] structured streaming doc update

## What changes were proposed in this pull request?

doc update

Author: Felix Cheung 

Closes #20197 from felixcheung/rwadoc.

(cherry picked from commit 02214b094390e913f52e71d55c9bb8a81c9e7ef9)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a23c07ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a23c07ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a23c07ec

Branch: refs/heads/branch-2.3
Commit: a23c07ecb1dba0dbd52a0d2362d8d21e9cdd8b5a
Parents: 911a4db
Author: Felix Cheung 
Authored: Mon Jan 8 22:08:19 2018 -0800
Committer: Felix Cheung 
Committed: Mon Jan 8 22:08:34 2018 -0800

--
 R/pkg/vignettes/sparkr-vignettes.Rmd   |  2 +-
 docs/sparkr.md |  2 +-
 docs/structured-streaming-programming-guide.md | 32 +++--
 3 files changed, 32 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/R/pkg/vignettes/sparkr-vignettes.Rmd
--
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 2e66242..feca617 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -1042,7 +1042,7 @@ unlink(modelPath)
 
 ## Structured Streaming
 
-SparkR supports the Structured Streaming API (experimental).
+SparkR supports the Structured Streaming API.
 
 You can check the Structured Streaming Programming Guide for [an 
introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model)
 to its programming model and basic concepts.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/docs/sparkr.md
--
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 997ea60..6685b58 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -596,7 +596,7 @@ The following example shows how to save/load a MLlib model 
by SparkR.
 
 # Structured Streaming
 
-SparkR supports the Structured Streaming API (experimental). Structured 
Streaming is a scalable and fault-tolerant stream processing engine built on 
the Spark SQL engine. For more information see the R API on the [Structured 
Streaming Programming Guide](structured-streaming-programming-guide.html)
+SparkR supports the Structured Streaming API. Structured Streaming is a 
scalable and fault-tolerant stream processing engine built on the Spark SQL 
engine. For more information see the R API on the [Structured Streaming 
Programming Guide](structured-streaming-programming-guide.html)
 
 # R Function Name Conflicts
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a23c07ec/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 31fcfab..de13e28 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -827,8 +827,8 @@ df.isStreaming()
 {% endhighlight %}
 
 
-{% highlight bash %}
-Not available.
+{% highlight r %}
+isStreaming(df)
 {% endhighlight %}
 
 
@@ -886,6 +886,19 @@ windowedCounts = words.groupBy(
 {% endhighlight %}
 
 
+
+{% highlight r %}
+words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+windowedCounts <- count(
+groupBy(
+  words,
+  window(words$timestamp, "10 minutes", "5 minutes"),
+  words$word))
+{% endhighlight %}
+
+
 
 
 
@@ -960,6 +973,21 @@ windowedCounts = words \
 {% endhighlight %}
 
 
+
+{% highlight r %}
+words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+
+words <- withWatermark(words, "timestamp", "10 minutes")
+windowedCounts <- count(
+groupBy(
+  words,
+  window(words$timestamp, "10 minutes", "5 minutes"),
+  words$word))
+{% endhighlight %}
+
+
 
 
 In this example, we are defining the watermark of the query on the value of 
the column "timestamp", 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21293][SPARKR][DOCS] structured streaming doc update

2018-01-08 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master 8486ad419 -> 02214b094


[SPARK-21293][SPARKR][DOCS] structured streaming doc update

## What changes were proposed in this pull request?

doc update

Author: Felix Cheung 

Closes #20197 from felixcheung/rwadoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/02214b09
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/02214b09
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/02214b09

Branch: refs/heads/master
Commit: 02214b094390e913f52e71d55c9bb8a81c9e7ef9
Parents: 8486ad419
Author: Felix Cheung 
Authored: Mon Jan 8 22:08:19 2018 -0800
Committer: Felix Cheung 
Committed: Mon Jan 8 22:08:19 2018 -0800

--
 R/pkg/vignettes/sparkr-vignettes.Rmd   |  2 +-
 docs/sparkr.md |  2 +-
 docs/structured-streaming-programming-guide.md | 32 +++--
 3 files changed, 32 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/R/pkg/vignettes/sparkr-vignettes.Rmd
--
diff --git a/R/pkg/vignettes/sparkr-vignettes.Rmd 
b/R/pkg/vignettes/sparkr-vignettes.Rmd
index 2e66242..feca617 100644
--- a/R/pkg/vignettes/sparkr-vignettes.Rmd
+++ b/R/pkg/vignettes/sparkr-vignettes.Rmd
@@ -1042,7 +1042,7 @@ unlink(modelPath)
 
 ## Structured Streaming
 
-SparkR supports the Structured Streaming API (experimental).
+SparkR supports the Structured Streaming API.
 
 You can check the Structured Streaming Programming Guide for [an 
introduction](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#programming-model)
 to its programming model and basic concepts.
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/docs/sparkr.md
--
diff --git a/docs/sparkr.md b/docs/sparkr.md
index 997ea60..6685b58 100644
--- a/docs/sparkr.md
+++ b/docs/sparkr.md
@@ -596,7 +596,7 @@ The following example shows how to save/load a MLlib model 
by SparkR.
 
 # Structured Streaming
 
-SparkR supports the Structured Streaming API (experimental). Structured 
Streaming is a scalable and fault-tolerant stream processing engine built on 
the Spark SQL engine. For more information see the R API on the [Structured 
Streaming Programming Guide](structured-streaming-programming-guide.html)
+SparkR supports the Structured Streaming API. Structured Streaming is a 
scalable and fault-tolerant stream processing engine built on the Spark SQL 
engine. For more information see the R API on the [Structured Streaming 
Programming Guide](structured-streaming-programming-guide.html)
 
 # R Function Name Conflicts
 

http://git-wip-us.apache.org/repos/asf/spark/blob/02214b09/docs/structured-streaming-programming-guide.md
--
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index 31fcfab..de13e28 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -827,8 +827,8 @@ df.isStreaming()
 {% endhighlight %}
 
 
-{% highlight bash %}
-Not available.
+{% highlight r %}
+isStreaming(df)
 {% endhighlight %}
 
 
@@ -886,6 +886,19 @@ windowedCounts = words.groupBy(
 {% endhighlight %}
 
 
+
+{% highlight r %}
+words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+windowedCounts <- count(
+groupBy(
+  words,
+  window(words$timestamp, "10 minutes", "5 minutes"),
+  words$word))
+{% endhighlight %}
+
+
 
 
 
@@ -960,6 +973,21 @@ windowedCounts = words \
 {% endhighlight %}
 
 
+
+{% highlight r %}
+words <- ...  # streaming DataFrame of schema { timestamp: Timestamp, word: 
String }
+
+# Group the data by window and word and compute the count of each group
+
+words <- withWatermark(words, "timestamp", "10 minutes")
+windowedCounts <- count(
+groupBy(
+  words,
+  window(words$timestamp, "10 minutes", "5 minutes"),
+  words$word))
+{% endhighlight %}
+
+
 
 
 In this example, we are defining the watermark of the query on the value of 
the column "timestamp", 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21292][DOCS] refreshtable example

2018-01-08 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 fd46a276c -> 911a4dbe7


[SPARK-21292][DOCS] refreshtable example

## What changes were proposed in this pull request?

doc update

Author: Felix Cheung 

Closes #20198 from felixcheung/rrefreshdoc.

(cherry picked from commit 8486ad419d8f1779e277ec71c39e1516673a83ab)
Signed-off-by: Felix Cheung 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/911a4dbe
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/911a4dbe
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/911a4dbe

Branch: refs/heads/branch-2.3
Commit: 911a4dbe7da45dbb62ed08b0c99bb97ea4fc9e3e
Parents: fd46a27
Author: Felix Cheung 
Authored: Mon Jan 8 21:58:26 2018 -0800
Committer: Felix Cheung 
Committed: Mon Jan 8 22:02:04 2018 -0800

--
 docs/sql-programming-guide.md | 16 
 1 file changed, 12 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/911a4dbe/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3ccaaf4..72f79d6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -915,6 +915,14 @@ spark.catalog.refreshTable("my_table")
 
 
 
+
+
+{% highlight r %}
+refreshTable("my_table")
+{% endhighlight %}
+
+
+
 
 
 {% highlight sql %}
@@ -1498,10 +1506,10 @@ that these options will be deprecated in future release 
as more optimizations ar
 ## Broadcast Hint for SQL Queries
 
 The `BROADCAST` hint guides Spark to broadcast each specified table when 
joining them with another table or view.
-When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is 
preferred, 
+When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is 
preferred,
 even if the statistics is above the configuration 
`spark.sql.autoBroadcastJoinThreshold`.
 When both sides of a join are specified, Spark broadcasts the one having the 
lower statistics.
-Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. 
full outer join) 
+Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. 
full outer join)
 support BHJ. When the broadcast nested loop join is selected, we still respect 
the hint.
 
 
@@ -1780,7 +1788,7 @@ options.
 Note that, for DecimalType(38,0)*, the table above intentionally 
does not cover all other combinations of scales and precisions because 
currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 
1.1 is inferred as double type.
   - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas 
related functionalities, such as `toPandas`, `createDataFrame` from Pandas 
DataFrame, etc.
   - In PySpark, the behavior of timestamp values for Pandas related 
functionalities was changed to respect session timezone. If you want to use the 
old behavior, you need to set a configuration 
`spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See 
[SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
- 
+
  - Since Spark 2.3, when either broadcast hash join or broadcast nested loop 
join is applicable, we prefer to broadcasting the table that is explicitly 
specified in a broadcast hint. For details, see the section [Broadcast 
Hint](#broadcast-hint-for-sql-queries) and 
[SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
 
  - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns 
an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it 
always returns as a string despite of input types. To keep the old behavior, 
set `spark.sql.function.concatBinaryAsString` to `true`.
@@ -2167,7 +2175,7 @@ Not all the APIs of the Hive UDF/UDTF/UDAF are supported 
by Spark SQL. Below are
   Spark SQL currently does not support the reuse of aggregation.
 * `getWindowingEvaluator` (`GenericUDAFEvaluator`) is a function to optimize 
aggregation by evaluating
   an aggregate over a fixed window.
-  
+
 ### Incompatible Hive UDF
 
 Below are the scenarios in which Hive and Spark generate different results:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21292][DOCS] refreshtable example

2018-01-08 Thread felixcheung
Repository: spark
Updated Branches:
  refs/heads/master f20131dd3 -> 8486ad419


[SPARK-21292][DOCS] refreshtable example

## What changes were proposed in this pull request?

doc update

Author: Felix Cheung 

Closes #20198 from felixcheung/rrefreshdoc.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8486ad41
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8486ad41
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8486ad41

Branch: refs/heads/master
Commit: 8486ad419d8f1779e277ec71c39e1516673a83ab
Parents: f20131d
Author: Felix Cheung 
Authored: Mon Jan 8 21:58:26 2018 -0800
Committer: Felix Cheung 
Committed: Mon Jan 8 21:58:26 2018 -0800

--
 docs/sql-programming-guide.md | 16 
 1 file changed, 12 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8486ad41/docs/sql-programming-guide.md
--
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 3ccaaf4..72f79d6 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -915,6 +915,14 @@ spark.catalog.refreshTable("my_table")
 
 
 
+
+
+{% highlight r %}
+refreshTable("my_table")
+{% endhighlight %}
+
+
+
 
 
 {% highlight sql %}
@@ -1498,10 +1506,10 @@ that these options will be deprecated in future release 
as more optimizations ar
 ## Broadcast Hint for SQL Queries
 
 The `BROADCAST` hint guides Spark to broadcast each specified table when 
joining them with another table or view.
-When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is 
preferred, 
+When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is 
preferred,
 even if the statistics is above the configuration 
`spark.sql.autoBroadcastJoinThreshold`.
 When both sides of a join are specified, Spark broadcasts the one having the 
lower statistics.
-Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. 
full outer join) 
+Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. 
full outer join)
 support BHJ. When the broadcast nested loop join is selected, we still respect 
the hint.
 
 
@@ -1780,7 +1788,7 @@ options.
 Note that, for DecimalType(38,0)*, the table above intentionally 
does not cover all other combinations of scales and precisions because 
currently we only infer decimal type like `BigInteger`/`BigInt`. For example, 
1.1 is inferred as double type.
   - In PySpark, now we need Pandas 0.19.2 or upper if you want to use Pandas 
related functionalities, such as `toPandas`, `createDataFrame` from Pandas 
DataFrame, etc.
   - In PySpark, the behavior of timestamp values for Pandas related 
functionalities was changed to respect session timezone. If you want to use the 
old behavior, you need to set a configuration 
`spark.sql.execution.pandas.respectSessionTimeZone` to `False`. See 
[SPARK-22395](https://issues.apache.org/jira/browse/SPARK-22395) for details.
- 
+
  - Since Spark 2.3, when either broadcast hash join or broadcast nested loop 
join is applicable, we prefer to broadcasting the table that is explicitly 
specified in a broadcast hint. For details, see the section [Broadcast 
Hint](#broadcast-hint-for-sql-queries) and 
[SPARK-22489](https://issues.apache.org/jira/browse/SPARK-22489).
 
  - Since Spark 2.3, when all inputs are binary, `functions.concat()` returns 
an output as binary. Otherwise, it returns as a string. Until Spark 2.3, it 
always returns as a string despite of input types. To keep the old behavior, 
set `spark.sql.function.concatBinaryAsString` to `true`.
@@ -2167,7 +2175,7 @@ Not all the APIs of the Hive UDF/UDTF/UDAF are supported 
by Spark SQL. Below are
   Spark SQL currently does not support the reuse of aggregation.
 * `getWindowingEvaluator` (`GenericUDAFEvaluator`) is a function to optimize 
aggregation by evaluating
   an aggregate over a fixed window.
-  
+
 ### Incompatible Hive UDF
 
 Below are the scenarios in which Hive and Spark generate different results:


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24089 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_20_01-f20131d-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Tue Jan  9 04:15:10 2018
New Revision: 24089

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_20_01-f20131d docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 7c30ae39f -> 24f1f2a54


[SPARK-22984] Fix incorrect bitmap copying and offset adjustment in 
GenerateUnsafeRowJoiner

## What changes were proposed in this pull request?

This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This 
class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / 
Spark 1.5.0+) and is used to combine pairs of UnsafeRows in 
TungstenAggregationIterator, CartesianProductExec, and AppendColumns.

### Bugs fixed by this patch

1. **Incorrect combining of null-tracking bitmaps**: when concatenating two 
UnsafeRows, the implementation "Concatenate the two bitsets together into a 
single one, taking padding into account". If one row has no columns then it has 
a bitset size of 0, but the code was incorrectly assuming that if the left row 
had a non-zero number of fields then the right row would also have at least one 
field, so it was copying invalid bytes and and treating them as part of the 
bitset. I'm not sure whether this bug was also present in the original 
implementation or whether it was introduced in 
https://github.com/apache/spark/pull/7892 (which fixed another bug in this 
code).
2. **Incorrect updating of data offsets for null variable-length fields**: 
after updating the bitsets and copying fixed-length and variable-length data, 
we need to perform adjustments to the offsets pointing the start of variable 
length fields's data. The existing code was _conditionally_ adding a fixed 
offset to correct for the new length of the combined row, but it is unsafe to 
do this if the variable-length field has a null value: we always represent 
nulls by storing `0` in the fixed-length slot, but this code was incorrectly 
incrementing those values. This bug was present since the original version of 
`GenerateUnsafeRowJoiner`.

### Why this bug remained latent for so long

The PR which introduced `GenerateUnsafeRowJoiner` features several randomized 
tests, including tests of the cases where one side of the join has no fields 
and where string-valued fields are null. However, the existing assertions were 
too weak to uncover this bug:

- If a null field has a non-zero value in its fixed-length data slot then this 
will not cause problems for field accesses because the null-tracking bitmap 
should still be correct and we will not try to use the incorrect offset for 
anything.
- If the null tracking bitmap is corrupted by joining against a row with no 
fields then the corruption occurs in field numbers past the actual field 
numbers contained in the row. Thus valid `isNullAt()` calls will not read the 
incorrectly-set bits.

The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and 
`isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit 
equality, preventing these bugs from failing assertions. It turns out that 
there was even a 
[GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala)
 but it looks like it also didn't catch this problem because it only tested the 
bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` 
interface instead of actually comparing the bitsets' bytes.

### Impact of these bugs

- This bug will cause `equals()` and `hashCode()` to be incorrect for these 
rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are 
used as join or grouping keys.
- Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in 
reads from invalid null bitmap positions causing fields to incorrectly become 
NULL (see the end-to-end example below).
  - It looks like this generally only happens in `CartesianProductExec`, which 
our query optimizer often avoids executing (usually we try to plan a 
`BroadcastNestedLoopJoin` instead).

### End-to-end test case demonstrating the problem

The following query demonstrates how this bug may result in incorrect query 
results:

```sql
set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger 
CartesianProductExec

create table a as select * from values 1;
create table b as select * from values 2;

SELECT
  t3.col1,
  t1.col1
FROM a t1
CROSS JOIN b t2
CROSS JOIN b t3
```

This should return `(2, 1)` but instead was returning `(null, 1)`.

Column pruning ends up trimming off all columns from `t2`, so when `t2` joins 
with another table this triggers the bitmap-copying bug. This incorrect bitmap 
is subsequently copied again when performing the final join, causing the final 
output to have an incorrectly-set null bit for the first field.

## How was this patch tested?

Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. 
Also verified that the end-to-end test case which uncovered this now passes.

Author: Josh Rosen 

Closes #20181 from 
JoshRosen/SPARK

spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 850b9f391 -> fd46a276c


[SPARK-22984] Fix incorrect bitmap copying and offset adjustment in 
GenerateUnsafeRowJoiner

## What changes were proposed in this pull request?

This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This 
class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / 
Spark 1.5.0+) and is used to combine pairs of UnsafeRows in 
TungstenAggregationIterator, CartesianProductExec, and AppendColumns.

### Bugs fixed by this patch

1. **Incorrect combining of null-tracking bitmaps**: when concatenating two 
UnsafeRows, the implementation "Concatenate the two bitsets together into a 
single one, taking padding into account". If one row has no columns then it has 
a bitset size of 0, but the code was incorrectly assuming that if the left row 
had a non-zero number of fields then the right row would also have at least one 
field, so it was copying invalid bytes and and treating them as part of the 
bitset. I'm not sure whether this bug was also present in the original 
implementation or whether it was introduced in 
https://github.com/apache/spark/pull/7892 (which fixed another bug in this 
code).
2. **Incorrect updating of data offsets for null variable-length fields**: 
after updating the bitsets and copying fixed-length and variable-length data, 
we need to perform adjustments to the offsets pointing the start of variable 
length fields's data. The existing code was _conditionally_ adding a fixed 
offset to correct for the new length of the combined row, but it is unsafe to 
do this if the variable-length field has a null value: we always represent 
nulls by storing `0` in the fixed-length slot, but this code was incorrectly 
incrementing those values. This bug was present since the original version of 
`GenerateUnsafeRowJoiner`.

### Why this bug remained latent for so long

The PR which introduced `GenerateUnsafeRowJoiner` features several randomized 
tests, including tests of the cases where one side of the join has no fields 
and where string-valued fields are null. However, the existing assertions were 
too weak to uncover this bug:

- If a null field has a non-zero value in its fixed-length data slot then this 
will not cause problems for field accesses because the null-tracking bitmap 
should still be correct and we will not try to use the incorrect offset for 
anything.
- If the null tracking bitmap is corrupted by joining against a row with no 
fields then the corruption occurs in field numbers past the actual field 
numbers contained in the row. Thus valid `isNullAt()` calls will not read the 
incorrectly-set bits.

The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and 
`isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit 
equality, preventing these bugs from failing assertions. It turns out that 
there was even a 
[GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala)
 but it looks like it also didn't catch this problem because it only tested the 
bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` 
interface instead of actually comparing the bitsets' bytes.

### Impact of these bugs

- This bug will cause `equals()` and `hashCode()` to be incorrect for these 
rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are 
used as join or grouping keys.
- Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in 
reads from invalid null bitmap positions causing fields to incorrectly become 
NULL (see the end-to-end example below).
  - It looks like this generally only happens in `CartesianProductExec`, which 
our query optimizer often avoids executing (usually we try to plan a 
`BroadcastNestedLoopJoin` instead).

### End-to-end test case demonstrating the problem

The following query demonstrates how this bug may result in incorrect query 
results:

```sql
set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger 
CartesianProductExec

create table a as select * from values 1;
create table b as select * from values 2;

SELECT
  t3.col1,
  t1.col1
FROM a t1
CROSS JOIN b t2
CROSS JOIN b t3
```

This should return `(2, 1)` but instead was returning `(null, 1)`.

Column pruning ends up trimming off all columns from `t2`, so when `t2` joins 
with another table this triggers the bitmap-copying bug. This incorrect bitmap 
is subsequently copied again when performing the final join, causing the final 
output to have an incorrectly-set null bit for the first field.

## How was this patch tested?

Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. 
Also verified that the end-to-end test case which uncovered this now passes.

Author: Josh Rosen 

Closes #20181 from 
JoshRosen/SPARK

spark git commit: [SPARK-22984] Fix incorrect bitmap copying and offset adjustment in GenerateUnsafeRowJoiner

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 849043ce1 -> f20131dd3


[SPARK-22984] Fix incorrect bitmap copying and offset adjustment in 
GenerateUnsafeRowJoiner

## What changes were proposed in this pull request?

This PR fixes a longstanding correctness bug in `GenerateUnsafeRowJoiner`. This 
class was introduced in https://github.com/apache/spark/pull/7821 (July 2015 / 
Spark 1.5.0+) and is used to combine pairs of UnsafeRows in 
TungstenAggregationIterator, CartesianProductExec, and AppendColumns.

### Bugs fixed by this patch

1. **Incorrect combining of null-tracking bitmaps**: when concatenating two 
UnsafeRows, the implementation "Concatenate the two bitsets together into a 
single one, taking padding into account". If one row has no columns then it has 
a bitset size of 0, but the code was incorrectly assuming that if the left row 
had a non-zero number of fields then the right row would also have at least one 
field, so it was copying invalid bytes and and treating them as part of the 
bitset. I'm not sure whether this bug was also present in the original 
implementation or whether it was introduced in 
https://github.com/apache/spark/pull/7892 (which fixed another bug in this 
code).
2. **Incorrect updating of data offsets for null variable-length fields**: 
after updating the bitsets and copying fixed-length and variable-length data, 
we need to perform adjustments to the offsets pointing the start of variable 
length fields's data. The existing code was _conditionally_ adding a fixed 
offset to correct for the new length of the combined row, but it is unsafe to 
do this if the variable-length field has a null value: we always represent 
nulls by storing `0` in the fixed-length slot, but this code was incorrectly 
incrementing those values. This bug was present since the original version of 
`GenerateUnsafeRowJoiner`.

### Why this bug remained latent for so long

The PR which introduced `GenerateUnsafeRowJoiner` features several randomized 
tests, including tests of the cases where one side of the join has no fields 
and where string-valued fields are null. However, the existing assertions were 
too weak to uncover this bug:

- If a null field has a non-zero value in its fixed-length data slot then this 
will not cause problems for field accesses because the null-tracking bitmap 
should still be correct and we will not try to use the incorrect offset for 
anything.
- If the null tracking bitmap is corrupted by joining against a row with no 
fields then the corruption occurs in field numbers past the actual field 
numbers contained in the row. Thus valid `isNullAt()` calls will not read the 
incorrectly-set bits.

The existing `GenerateUnsafeRowJoinerSuite` tests only exercised `.get()` and 
`isNullAt()`, but didn't actually check the UnsafeRows for bit-for-bit 
equality, preventing these bugs from failing assertions. It turns out that 
there was even a 
[GenerateUnsafeRowJoinerBitsetSuite](https://github.com/apache/spark/blob/03377d2522776267a07b7d6ae9bddf79a4e0f516/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerBitsetSuite.scala)
 but it looks like it also didn't catch this problem because it only tested the 
bitsets in an end-to-end fashion by accessing them through the `UnsafeRow` 
interface instead of actually comparing the bitsets' bytes.

### Impact of these bugs

- This bug will cause `equals()` and `hashCode()` to be incorrect for these 
rows, which will be problematic in case`GenerateUnsafeRowJoiner`'s results are 
used as join or grouping keys.
- Chained / repeated invocations of `GenerateUnsafeRowJoiner` may result in 
reads from invalid null bitmap positions causing fields to incorrectly become 
NULL (see the end-to-end example below).
  - It looks like this generally only happens in `CartesianProductExec`, which 
our query optimizer often avoids executing (usually we try to plan a 
`BroadcastNestedLoopJoin` instead).

### End-to-end test case demonstrating the problem

The following query demonstrates how this bug may result in incorrect query 
results:

```sql
set spark.sql.autoBroadcastJoinThreshold=-1; -- Needed to trigger 
CartesianProductExec

create table a as select * from values 1;
create table b as select * from values 2;

SELECT
  t3.col1,
  t1.col1
FROM a t1
CROSS JOIN b t2
CROSS JOIN b t3
```

This should return `(2, 1)` but instead was returning `(null, 1)`.

Column pruning ends up trimming off all columns from `t2`, so when `t2` joins 
with another table this triggers the bitmap-copying bug. This incorrect bitmap 
is subsequently copied again when performing the final join, causing the final 
output to have an incorrectly-set null bit for the first field.

## How was this patch tested?

Strengthened the assertions in existing tests in GenerateUnsafeRowJoinerSuite. 
Also verified that the end-to-end test case which uncovered this now passes.

Author: Josh Rosen 

Closes #20181 from 
JoshRosen/SPARK-229

spark git commit: [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 68ce792b5 -> 849043ce1


[SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab

## What changes were proposed in this pull request?

In current implementation, the function `isFairScheduler` is always false, 
since it is comparing String with `SchedulingMode`

Author: Wang Gengliang 

Closes #20186 from gengliangwang/isFairScheduler.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/849043ce
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/849043ce
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/849043ce

Branch: refs/heads/master
Commit: 849043ce1d28a976659278d29368da0799329db8
Parents: 68ce792
Author: Wang Gengliang 
Authored: Tue Jan 9 10:44:21 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Jan 9 10:44:21 2018 +0800

--
 core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala   | 8 
 core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 8 
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/849043ce/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 99eab1b..ff1b75e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -34,10 +34,10 @@ private[ui] class JobsTab(parent: SparkUI, store: 
AppStatusStore)
   val killEnabled = parent.killEnabled
 
   def isFairScheduler: Boolean = {
-store.environmentInfo().sparkProperties.toMap
-  .get("spark.scheduler.mode")
-  .map { mode => mode == SchedulingMode.FAIR }
-  .getOrElse(false)
+store
+  .environmentInfo()
+  .sparkProperties
+  .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
   }
 
   def getSparkUser: String = parent.getSparkUser

http://git-wip-us.apache.org/repos/asf/spark/blob/849043ce/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index be05a96..10b0320 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -37,10 +37,10 @@ private[ui] class StagesTab(val parent: SparkUI, val store: 
AppStatusStore)
   attachPage(new PoolPage(this))
 
   def isFairScheduler: Boolean = {
-store.environmentInfo().sparkProperties.toMap
-  .get("spark.scheduler.mode")
-  .map { mode => mode == SchedulingMode.FAIR }
-  .getOrElse(false)
+store
+  .environmentInfo()
+  .sparkProperties
+  .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
   }
 
   def handleKillRequest(request: HttpServletRequest): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8032cf852 -> 850b9f391


[SPARK-22990][CORE] Fix method isFairScheduler in JobsTab and StagesTab

## What changes were proposed in this pull request?

In current implementation, the function `isFairScheduler` is always false, 
since it is comparing String with `SchedulingMode`

Author: Wang Gengliang 

Closes #20186 from gengliangwang/isFairScheduler.

(cherry picked from commit 849043ce1d28a976659278d29368da0799329db8)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/850b9f39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/850b9f39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/850b9f39

Branch: refs/heads/branch-2.3
Commit: 850b9f39186665fd727737a98b29abe5236830db
Parents: 8032cf8
Author: Wang Gengliang 
Authored: Tue Jan 9 10:44:21 2018 +0800
Committer: Wenchen Fan 
Committed: Tue Jan 9 10:44:42 2018 +0800

--
 core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala   | 8 
 core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala | 8 
 2 files changed, 8 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/850b9f39/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
index 99eab1b..ff1b75e 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/JobsTab.scala
@@ -34,10 +34,10 @@ private[ui] class JobsTab(parent: SparkUI, store: 
AppStatusStore)
   val killEnabled = parent.killEnabled
 
   def isFairScheduler: Boolean = {
-store.environmentInfo().sparkProperties.toMap
-  .get("spark.scheduler.mode")
-  .map { mode => mode == SchedulingMode.FAIR }
-  .getOrElse(false)
+store
+  .environmentInfo()
+  .sparkProperties
+  .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
   }
 
   def getSparkUser: String = parent.getSparkUser

http://git-wip-us.apache.org/repos/asf/spark/blob/850b9f39/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
--
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
index be05a96..10b0320 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagesTab.scala
@@ -37,10 +37,10 @@ private[ui] class StagesTab(val parent: SparkUI, val store: 
AppStatusStore)
   attachPage(new PoolPage(this))
 
   def isFairScheduler: Boolean = {
-store.environmentInfo().sparkProperties.toMap
-  .get("spark.scheduler.mode")
-  .map { mode => mode == SchedulingMode.FAIR }
-  .getOrElse(false)
+store
+  .environmentInfo()
+  .sparkProperties
+  .contains(("spark.scheduler.mode", SchedulingMode.FAIR.toString))
   }
 
   def handleKillRequest(request: HttpServletRequest): Unit = {


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc

2018-01-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 eecd83cb2 -> 8032cf852


[SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider 
org.apache.spark.sql.hive.orc

## What changes were proposed in this pull request?
Fix the warning: Couldn't find corresponding Hive SerDe for data source 
provider org.apache.spark.sql.hive.orc.

## How was this patch tested?
 test("SPARK-22972: hive orc source")
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
  .equals(HiveSerDe.sourceToSerDe("orc")))

Author: xubo245 <601450...@qq.com>

Closes #20165 from xubo245/HiveSerDe.

(cherry picked from commit 68ce792b5857f0291154f524ac651036db868bb9)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8032cf85
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8032cf85
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8032cf85

Branch: refs/heads/branch-2.3
Commit: 8032cf852fccd0ab8754f633affdc9ba8fc99e58
Parents: eecd83c
Author: xubo245 <601450...@qq.com>
Authored: Tue Jan 9 10:15:01 2018 +0800
Committer: gatorsmile 
Committed: Tue Jan 9 10:15:45 2018 +0800

--
 .../apache/spark/sql/internal/HiveSerDe.scala   |  1 +
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 29 
 2 files changed, 30 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/8032cf85/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index b9515ec..dac4636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -73,6 +73,7 @@ object HiveSerDe {
 val key = source.toLowerCase(Locale.ROOT) match {
   case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
   case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
+  case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc"
   case s if s.equals("orcfile") => "orc"
   case s if s.equals("parquetfile") => "parquet"
   case s if s.equals("avrofile") => "avro"

http://git-wip-us.apache.org/repos/asf/spark/blob/8032cf85/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 17b7d8c..d556a03 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.orc
 import java.io.File
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.orc.OrcSuite
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.util.Utils
 
 class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
@@ -62,6 +64,33 @@ class HiveOrcSourceSuite extends OrcSuite with 
TestHiveSingleton {
""".stripMargin)
   }
 
+  test("SPARK-22972: hive orc source") {
+val tableName = "normal_orc_as_source_hive"
+withTable(tableName) {
+  sql(
+s"""
+  |CREATE TABLE $tableName
+  |USING org.apache.spark.sql.hive.orc
+  |OPTIONS (
+  |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
+  |)
+""".stripMargin)
+
+  val tableMetadata = spark.sessionState.catalog.getTableMetadata(
+TableIdentifier(tableName))
+  assert(tableMetadata.storage.inputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
+  assert(tableMetadata.storage.outputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+  assert(tableMetadata.storage.serde ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
+.equals(HiveSerDe.sourceToSerDe("orc")))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc")
+.equals(HiveSerDe.sourceToSerDe("orc")))
+}
+  }
+
   test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
 val location = Utils.createTempDir()
 val uri = location.toURI


-
To unsubscribe, e-mail: commits-unsubscr.

spark git commit: [SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider org.apache.spark.sql.hive.orc

2018-01-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 4f7e75883 -> 68ce792b5


[SPARK-22972] Couldn't find corresponding Hive SerDe for data source provider 
org.apache.spark.sql.hive.orc

## What changes were proposed in this pull request?
Fix the warning: Couldn't find corresponding Hive SerDe for data source 
provider org.apache.spark.sql.hive.orc.

## How was this patch tested?
 test("SPARK-22972: hive orc source")
assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
  .equals(HiveSerDe.sourceToSerDe("orc")))

Author: xubo245 <601450...@qq.com>

Closes #20165 from xubo245/HiveSerDe.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/68ce792b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/68ce792b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/68ce792b

Branch: refs/heads/master
Commit: 68ce792b5857f0291154f524ac651036db868bb9
Parents: 4f7e758
Author: xubo245 <601450...@qq.com>
Authored: Tue Jan 9 10:15:01 2018 +0800
Committer: gatorsmile 
Committed: Tue Jan 9 10:15:01 2018 +0800

--
 .../apache/spark/sql/internal/HiveSerDe.scala   |  1 +
 .../spark/sql/hive/orc/HiveOrcSourceSuite.scala | 29 
 2 files changed, 30 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/68ce792b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
index b9515ec..dac4636 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/internal/HiveSerDe.scala
@@ -73,6 +73,7 @@ object HiveSerDe {
 val key = source.toLowerCase(Locale.ROOT) match {
   case s if s.startsWith("org.apache.spark.sql.parquet") => "parquet"
   case s if s.startsWith("org.apache.spark.sql.orc") => "orc"
+  case s if s.startsWith("org.apache.spark.sql.hive.orc") => "orc"
   case s if s.equals("orcfile") => "orc"
   case s if s.equals("parquetfile") => "parquet"
   case s if s.equals("avrofile") => "avro"

http://git-wip-us.apache.org/repos/asf/spark/blob/68ce792b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
--
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
index 17b7d8c..d556a03 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/orc/HiveOrcSourceSuite.scala
@@ -20,8 +20,10 @@ package org.apache.spark.sql.hive.orc
 import java.io.File
 
 import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.execution.datasources.orc.OrcSuite
 import org.apache.spark.sql.hive.test.TestHiveSingleton
+import org.apache.spark.sql.internal.HiveSerDe
 import org.apache.spark.util.Utils
 
 class HiveOrcSourceSuite extends OrcSuite with TestHiveSingleton {
@@ -62,6 +64,33 @@ class HiveOrcSourceSuite extends OrcSuite with 
TestHiveSingleton {
""".stripMargin)
   }
 
+  test("SPARK-22972: hive orc source") {
+val tableName = "normal_orc_as_source_hive"
+withTable(tableName) {
+  sql(
+s"""
+  |CREATE TABLE $tableName
+  |USING org.apache.spark.sql.hive.orc
+  |OPTIONS (
+  |  PATH '${new File(orcTableAsDir.getAbsolutePath).toURI}'
+  |)
+""".stripMargin)
+
+  val tableMetadata = spark.sessionState.catalog.getTableMetadata(
+TableIdentifier(tableName))
+  assert(tableMetadata.storage.inputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcInputFormat"))
+  assert(tableMetadata.storage.outputFormat ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat"))
+  assert(tableMetadata.storage.serde ==
+Option("org.apache.hadoop.hive.ql.io.orc.OrcSerde"))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.hive.orc")
+.equals(HiveSerDe.sourceToSerDe("orc")))
+  assert(HiveSerDe.sourceToSerDe("org.apache.spark.sql.orc")
+.equals(HiveSerDe.sourceToSerDe("orc")))
+}
+  }
+
   test("SPARK-19459/SPARK-18220: read char/varchar column written by Hive") {
 val location = Utils.createTempDir()
 val uri = location.toURI


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24086 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_16_01-4f7e758-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Tue Jan  9 00:15:01 2018
New Revision: 24086

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_16_01-4f7e758 docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24085 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_14_01-eecd83c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 22:17:41 2018
New Revision: 24085

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_14_01-eecd83c docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22912] v2 data source support in MicroBatchExecution

2018-01-08 Thread tdas
Repository: spark
Updated Branches:
  refs/heads/master eed82a0b2 -> 4f7e75883


[SPARK-22912] v2 data source support in MicroBatchExecution

## What changes were proposed in this pull request?

Support for v2 data sources in microbatch streaming.

## How was this patch tested?

A very basic new unit test on the toy v2 implementation of rate source. Once we 
have a v1 source fully migrated to v2, we'll need to do more detailed 
compatibility testing.

Author: Jose Torres 

Closes #20097 from jose-torres/v2-impl.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4f7e7588
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4f7e7588
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4f7e7588

Branch: refs/heads/master
Commit: 4f7e75883436069c2d9028c4cd5daa78e8d59560
Parents: eed82a0
Author: Jose Torres 
Authored: Mon Jan 8 13:24:08 2018 -0800
Committer: Tathagata Das 
Committed: Mon Jan 8 13:24:08 2018 -0800

--
 apache.spark.sql.sources.DataSourceRegister |   1 +
 .../datasources/v2/DataSourceV2Relation.scala   |  10 ++
 .../streaming/MicroBatchExecution.scala | 112 +++
 .../execution/streaming/ProgressReporter.scala  |   6 +-
 .../streaming/RateSourceProvider.scala  |  10 +-
 .../execution/streaming/StreamExecution.scala   |   4 +-
 .../execution/streaming/StreamingRelation.scala |   4 +-
 .../continuous/ContinuousExecution.scala|   4 +-
 .../continuous/ContinuousRateStreamSource.scala |  17 +--
 .../streaming/sources/RateStreamSourceV2.scala  |  31 -
 .../spark/sql/streaming/DataStreamReader.scala  |  25 -
 .../sql/streaming/StreamingQueryManager.scala   |  24 ++--
 .../execution/streaming/RateSourceV2Suite.scala |  68 +--
 .../apache/spark/sql/streaming/StreamTest.scala |   2 +-
 .../streaming/continuous/ContinuousSuite.scala  |   2 +-
 15 files changed, 241 insertions(+), 79 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
--
diff --git 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
index 6cdfe2f..0259c77 100644
--- 
a/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
+++ 
b/sql/core/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -7,3 +7,4 @@ org.apache.spark.sql.execution.datasources.text.TextFileFormat
 org.apache.spark.sql.execution.streaming.ConsoleSinkProvider
 org.apache.spark.sql.execution.streaming.TextSocketSourceProvider
 org.apache.spark.sql.execution.streaming.RateSourceProvider
+org.apache.spark.sql.execution.streaming.sources.RateSourceProviderV2

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 7eb99a6..cba20dd 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -35,6 +35,16 @@ case class DataSourceV2Relation(
   }
 }
 
+/**
+ * A specialization of DataSourceV2Relation with the streaming bit set to 
true. Otherwise identical
+ * to the non-streaming relation.
+ */
+class StreamingDataSourceV2Relation(
+fullOutput: Seq[AttributeReference],
+reader: DataSourceV2Reader) extends DataSourceV2Relation(fullOutput, 
reader) {
+  override def isStreaming: Boolean = true
+}
+
 object DataSourceV2Relation {
   def apply(reader: DataSourceV2Reader): DataSourceV2Relation = {
 new DataSourceV2Relation(reader.readSchema().toAttributes, reader)

http://git-wip-us.apache.org/repos/asf/spark/blob/4f7e7588/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
--
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index 9a7a13f..42240ee 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Mi

spark git commit: [SPARK-22992][K8S] Remove assumption of the DNS domain

2018-01-08 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 06fd842e3 -> eecd83cb2


[SPARK-22992][K8S] Remove assumption of the DNS domain

## What changes were proposed in this pull request?

Remove the use of FQDN to access the driver because it assumes that it's set up 
in a DNS zone - `cluster.local` which is common but not ubiquitous
Note that we already access the in-cluster API server through 
`kubernetes.default.svc`, so, by extension, this should work as well.
The alternative is to introduce DNS zones for both of those addresses.

## How was this patch tested?
Unit tests

cc vanzin liyinan926 mridulm mccheah

Author: foxish 

Closes #20187 from foxish/cluster.local.

(cherry picked from commit eed82a0b211352215316ec70dc48aefc013ad0b2)
Signed-off-by: Marcelo Vanzin 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eecd83cb
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eecd83cb
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eecd83cb

Branch: refs/heads/branch-2.3
Commit: eecd83cb2d24907aba303095b052997471247500
Parents: 06fd842
Author: foxish 
Authored: Mon Jan 8 13:01:45 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Jan 8 13:01:56 2018 -0800

--
 .../deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala | 2 +-
 .../k8s/submit/steps/DriverServiceBootstrapStepSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eecd83cb/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
index eb594e4..34af7cd 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
@@ -83,7 +83,7 @@ private[spark] class DriverServiceBootstrapStep(
   .build()
 
 val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
-val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc"
 val resolvedSparkConf = driverSpec.driverSparkConf.clone()
   .set(DRIVER_HOST_KEY, driverHostname)
   .set("spark.driver.port", driverPort.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/eecd83cb/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
index 006ce26..78c8c3b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
@@ -85,7 +85,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite 
with BeforeAndAfter
 val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
 val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
   DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
-val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+val expectedHostName = s"$expectedServiceName.my-namespace.svc"
 verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
   }
 
@@ -120,7 +120,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite 
with BeforeAndAfter
 val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
 val expectedServiceName = 
s"spark-1${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
 assert(driverService.getMetadata.getName === expectedServiceName)
-val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+val expectedHostName = s"$expectedServiceName.my-namespace.svc"
 verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark

spark git commit: [SPARK-22992][K8S] Remove assumption of the DNS domain

2018-01-08 Thread vanzin
Repository: spark
Updated Branches:
  refs/heads/master 40b983c3b -> eed82a0b2


[SPARK-22992][K8S] Remove assumption of the DNS domain

## What changes were proposed in this pull request?

Remove the use of FQDN to access the driver because it assumes that it's set up 
in a DNS zone - `cluster.local` which is common but not ubiquitous
Note that we already access the in-cluster API server through 
`kubernetes.default.svc`, so, by extension, this should work as well.
The alternative is to introduce DNS zones for both of those addresses.

## How was this patch tested?
Unit tests

cc vanzin liyinan926 mridulm mccheah

Author: foxish 

Closes #20187 from foxish/cluster.local.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eed82a0b
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eed82a0b
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eed82a0b

Branch: refs/heads/master
Commit: eed82a0b211352215316ec70dc48aefc013ad0b2
Parents: 40b983c
Author: foxish 
Authored: Mon Jan 8 13:01:45 2018 -0800
Committer: Marcelo Vanzin 
Committed: Mon Jan 8 13:01:45 2018 -0800

--
 .../deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala | 2 +-
 .../k8s/submit/steps/DriverServiceBootstrapStepSuite.scala   | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eed82a0b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
index eb594e4..34af7cd 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStep.scala
@@ -83,7 +83,7 @@ private[spark] class DriverServiceBootstrapStep(
   .build()
 
 val namespace = sparkConf.get(KUBERNETES_NAMESPACE)
-val driverHostname = 
s"${driverService.getMetadata.getName}.$namespace.svc.cluster.local"
+val driverHostname = s"${driverService.getMetadata.getName}.$namespace.svc"
 val resolvedSparkConf = driverSpec.driverSparkConf.clone()
   .set(DRIVER_HOST_KEY, driverHostname)
   .set("spark.driver.port", driverPort.toString)

http://git-wip-us.apache.org/repos/asf/spark/blob/eed82a0b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
--
diff --git 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
index 006ce26..78c8c3b 100644
--- 
a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
+++ 
b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/deploy/k8s/submit/steps/DriverServiceBootstrapStepSuite.scala
@@ -85,7 +85,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite 
with BeforeAndAfter
 val resolvedDriverSpec = configurationStep.configureDriver(baseDriverSpec)
 val expectedServiceName = SHORT_RESOURCE_NAME_PREFIX +
   DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX
-val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+val expectedHostName = s"$expectedServiceName.my-namespace.svc"
 verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
   }
 
@@ -120,7 +120,7 @@ class DriverServiceBootstrapStepSuite extends SparkFunSuite 
with BeforeAndAfter
 val driverService = 
resolvedDriverSpec.otherKubernetesResources.head.asInstanceOf[Service]
 val expectedServiceName = 
s"spark-1${DriverServiceBootstrapStep.DRIVER_SVC_POSTFIX}"
 assert(driverService.getMetadata.getName === expectedServiceName)
-val expectedHostName = 
s"$expectedServiceName.my-namespace.svc.cluster.local"
+val expectedHostName = s"$expectedServiceName.my-namespace.svc"
 verifySparkConfHostNames(resolvedDriverSpec.driverSparkConf, 
expectedHostName)
   }
 


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark-website git commit: update latest

2018-01-08 Thread srowen
Repository: spark-website
Updated Branches:
  refs/heads/asf-site 8c5354f29 -> 1a8adcaa8


update latest


Project: http://git-wip-us.apache.org/repos/asf/spark-website/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark-website/commit/1a8adcaa
Tree: http://git-wip-us.apache.org/repos/asf/spark-website/tree/1a8adcaa
Diff: http://git-wip-us.apache.org/repos/asf/spark-website/diff/1a8adcaa

Branch: refs/heads/asf-site
Commit: 1a8adcaa872743663f6d429e3a1962f9ce888755
Parents: 8c5354f
Author: Felix Cheung 
Authored: Mon Jan 8 10:28:12 2018 -0800
Committer: Felix Cheung 
Committed: Mon Jan 8 10:28:12 2018 -0800

--
 site/docs/latest | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/spark-website/blob/1a8adcaa/site/docs/latest
--
diff --git a/site/docs/latest b/site/docs/latest
index e3a4f19..fae692e 12
--- a/site/docs/latest
+++ b/site/docs/latest
@@ -1 +1 @@
-2.2.0
\ No newline at end of file
+2.2.1
\ No newline at end of file


-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24082 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_10_01-06fd842-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 18:18:59 2018
New Revision: 24082

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_10_01-06fd842 docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24079 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_08_01-40b983c-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 16:20:17 2018
New Revision: 24079

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_08_01-40b983c docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 4a45f0a53 -> 06fd842e3


[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

## What changes were proposed in this pull request?
1.  Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId`
2. Replace usage of stageAttemptId with stageAttemptNumber

## How was this patch tested?
I manually checked the compiler warning info

Author: Xianjin YE 

Closes #20178 from advancedxy/SPARK-22952.

(cherry picked from commit 40b983c3b44b6771f07302ce87987fa4716b5ebf)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/06fd842e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/06fd842e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/06fd842e

Branch: refs/heads/branch-2.3
Commit: 06fd842e3a120fde1c137e4945bcb747fc71a322
Parents: 4a45f0a
Author: Xianjin YE 
Authored: Mon Jan 8 23:49:07 2018 +0800
Committer: Wenchen Fan 
Committed: Mon Jan 8 23:49:27 2018 +0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 15 +++---
 .../org/apache/spark/scheduler/StageInfo.scala  |  4 +-
 .../spark/scheduler/StatsReportListener.scala   |  2 +-
 .../apache/spark/status/AppStatusListener.scala |  7 +--
 .../org/apache/spark/status/LiveEntity.scala|  4 +-
 .../spark/ui/scope/RDDOperationGraph.scala  |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala|  2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 54 +++-
 .../sql/execution/ui/SQLAppStatusListener.scala |  2 +-
 9 files changed, 51 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/06fd842e/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c2498d4..199937b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -815,7 +815,8 @@ class DAGScheduler(
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
 // Note that there is a chance that this task is launched after the stage 
is cancelled.
 // In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+val stageAttemptId =
+  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
@@ -1050,7 +1051,7 @@ class DAGScheduler(
 val locs = taskIdToLocations(id)
 val part = stage.rdd.partitions(id)
 stage.pendingPartitions += id
-new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
+new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
   taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
   Option(sc.applicationId), sc.applicationAttemptId)
   }
@@ -1060,7 +1061,7 @@ class DAGScheduler(
 val p: Int = stage.partitions(id)
 val part = stage.rdd.partitions(p)
 val locs = taskIdToLocations(id)
-new ResultTask(stage.id, stage.latestInfo.attemptId,
+new ResultTask(stage.id, stage.latestInfo.attemptNumber,
   taskBinary, part, locs, id, properties, serializedTaskMetrics,
   Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
   }
@@ -1076,7 +1077,7 @@ class DAGScheduler(
   logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
 s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
   taskScheduler.submitTasks(new TaskSet(
-tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
+tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties))
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark
   // the stage as completed here in case there are no tasks to run
@@ -1245,7 +1246,7 @@ class DAGScheduler(
 val status = event.result.asInstanceOf[MapStatus]
 val execId = status.location.executorId
 logDebug("ShuffleMapTask finished on " + execId)
-if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {
+if (stageIdToStage(task.stageId).latestInfo.attemptNumber == 
task.stageAttemptId) {
   // This task was for the currently running attempt of the stage. 

spark git commit: [SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master eb45b52e8 -> 40b983c3b


[SPARK-22952][CORE] Deprecate stageAttemptId in favour of stageAttemptNumber

## What changes were proposed in this pull request?
1.  Deprecate attemptId in StageInfo and add `def attemptNumber() = attemptId`
2. Replace usage of stageAttemptId with stageAttemptNumber

## How was this patch tested?
I manually checked the compiler warning info

Author: Xianjin YE 

Closes #20178 from advancedxy/SPARK-22952.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/40b983c3
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/40b983c3
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/40b983c3

Branch: refs/heads/master
Commit: 40b983c3b44b6771f07302ce87987fa4716b5ebf
Parents: eb45b52
Author: Xianjin YE 
Authored: Mon Jan 8 23:49:07 2018 +0800
Committer: Wenchen Fan 
Committed: Mon Jan 8 23:49:07 2018 +0800

--
 .../apache/spark/scheduler/DAGScheduler.scala   | 15 +++---
 .../org/apache/spark/scheduler/StageInfo.scala  |  4 +-
 .../spark/scheduler/StatsReportListener.scala   |  2 +-
 .../apache/spark/status/AppStatusListener.scala |  7 +--
 .../org/apache/spark/status/LiveEntity.scala|  4 +-
 .../spark/ui/scope/RDDOperationGraph.scala  |  2 +-
 .../org/apache/spark/util/JsonProtocol.scala|  2 +-
 .../spark/status/AppStatusListenerSuite.scala   | 54 +++-
 .../sql/execution/ui/SQLAppStatusListener.scala |  2 +-
 9 files changed, 51 insertions(+), 41 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/40b983c3/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
--
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index c2498d4..199937b 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -815,7 +815,8 @@ class DAGScheduler(
   private[scheduler] def handleBeginEvent(task: Task[_], taskInfo: TaskInfo) {
 // Note that there is a chance that this task is launched after the stage 
is cancelled.
 // In that case, we wouldn't have the stage anymore in stageIdToStage.
-val stageAttemptId = 
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptId).getOrElse(-1)
+val stageAttemptId =
+  
stageIdToStage.get(task.stageId).map(_.latestInfo.attemptNumber).getOrElse(-1)
 listenerBus.post(SparkListenerTaskStart(task.stageId, stageAttemptId, 
taskInfo))
   }
 
@@ -1050,7 +1051,7 @@ class DAGScheduler(
 val locs = taskIdToLocations(id)
 val part = stage.rdd.partitions(id)
 stage.pendingPartitions += id
-new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
+new ShuffleMapTask(stage.id, stage.latestInfo.attemptNumber,
   taskBinary, part, locs, properties, serializedTaskMetrics, 
Option(jobId),
   Option(sc.applicationId), sc.applicationAttemptId)
   }
@@ -1060,7 +1061,7 @@ class DAGScheduler(
 val p: Int = stage.partitions(id)
 val part = stage.rdd.partitions(p)
 val locs = taskIdToLocations(id)
-new ResultTask(stage.id, stage.latestInfo.attemptId,
+new ResultTask(stage.id, stage.latestInfo.attemptNumber,
   taskBinary, part, locs, id, properties, serializedTaskMetrics,
   Option(jobId), Option(sc.applicationId), sc.applicationAttemptId)
   }
@@ -1076,7 +1077,7 @@ class DAGScheduler(
   logInfo(s"Submitting ${tasks.size} missing tasks from $stage 
(${stage.rdd}) (first 15 " +
 s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")
   taskScheduler.submitTasks(new TaskSet(
-tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, 
properties))
+tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, 
properties))
 } else {
   // Because we posted SparkListenerStageSubmitted earlier, we should mark
   // the stage as completed here in case there are no tasks to run
@@ -1245,7 +1246,7 @@ class DAGScheduler(
 val status = event.result.asInstanceOf[MapStatus]
 val execId = status.location.executorId
 logDebug("ShuffleMapTask finished on " + execId)
-if (stageIdToStage(task.stageId).latestInfo.attemptId == 
task.stageAttemptId) {
+if (stageIdToStage(task.stageId).latestInfo.attemptNumber == 
task.stageAttemptId) {
   // This task was for the currently running attempt of the stage. 
Since the task
   // completed successfully from the perspective of the 
TaskSetManager, mark 

svn commit: r24074 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_06_01-4a45f0a-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 14:14:57 2018
New Revision: 24074

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_06_01-4a45f0a docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24071 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_05_06-eb45b52-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 13:19:57 2018
New Revision: 24071

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_05_06-eb45b52 docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 6964dfe47 -> 4a45f0a53


[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL

## What changes were proposed in this pull request?

**The current shuffle planning logic**

1. Each operator specifies the distribution requirements for its children, via 
the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` 
interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a 
`Distribution`.
4. For each operator, check each child of it, add a shuffle node above the 
child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings are 
compatible with each other, via the `Partitioning.compatibleWith`.
6. If the check in 5 failed, add a shuffle above each child.
7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`.

This design has a major problem with the definition of "compatible".

`Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` 
can't know if it's compatible with other `Partitioning`, without more 
information from the operator. For example, `t1 join t2 on t1.a = t2.b`, 
`HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` 
under this case, but the partitioning itself doesn't know it.

As a result, currently `Partitioning.compatibleWith` always return false except 
for literals, which make it almost useless. This also means, if an operator has 
distribution requirements for multiple children, Spark always add shuffle nodes 
to all the children(although some of them can be eliminated). However, there is 
no guarantee that the children's output partitionings are compatible with each 
other after adding these shuffles, we just assume that the operator will only 
specify `ClusteredDistribution` for multiple children.

I think it's very hard to guarantee children co-partition for all kinds of 
operators, and we can not even give a clear definition about co-partition 
between distributions like `ClusteredDistribution(a,b)` and 
`ClusteredDistribution(c)`.

I think we should drop the "compatible" concept in the distribution model, and 
let the operator achieve the co-partition requirement by special distribution 
requirements.

**Proposed shuffle planning logic after this PR**
(The first 4 are same as before)
1. Each operator specifies the distribution requirements for its children, via 
the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` 
interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a 
`Distribution`.
4. For each operator, check each child of it, add a shuffle node above the 
child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings have the 
same number of partitions.
6. If the check in 5 failed, pick the max number of partitions from children's 
output partitionings, and add shuffle to child whose number of partitions 
doesn't equal to the max one.

The new distribution model is very simple, we only have one kind of 
relationship, which is `Partitioning.satisfy`. For multiple children, Spark 
only guarantees they have the same number of partitions, and it's the 
operator's responsibility to leverage this guarantee to achieve more 
complicated requirements. For example, non-broadcast joins can use the newly 
added `HashPartitionedDistribution` to achieve co-partition.

## How was this patch tested?

existing tests.

Author: Wenchen Fan 

Closes #19080 from cloud-fan/exchange.

(cherry picked from commit eb45b52e826ea9cea48629760db35ef87f91fea0)
Signed-off-by: Wenchen Fan 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4a45f0a5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4a45f0a5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4a45f0a5

Branch: refs/heads/branch-2.3
Commit: 4a45f0a532216736f8874417c5cbd7912ca13db5
Parents: 6964dfe
Author: Wenchen Fan 
Authored: Mon Jan 8 19:41:41 2018 +0800
Committer: Wenchen Fan 
Committed: Mon Jan 8 19:42:28 2018 +0800

--
 .../catalyst/plans/physical/partitioning.scala  | 286 +++
 .../spark/sql/catalyst/PartitioningSuite.scala  |  55 
 .../apache/spark/sql/execution/SparkPlan.scala  |  16 +-
 .../execution/exchange/EnsureRequirements.scala | 120 +++-
 .../execution/joins/ShuffledHashJoinExec.scala  |   2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../apache/spark/sql/execution/objects.scala|   2 +-
 .../spark/sql/execution/PlannerSuite.scala  |  81 ++
 8 files changed, 194 insertions(+), 370 deletions(-)
--


http://git-wip-us.apache

spark git commit: [SPARK-21865][SQL] simplify the distribution semantic of Spark SQL

2018-01-08 Thread wenchen
Repository: spark
Updated Branches:
  refs/heads/master 2c73d2a94 -> eb45b52e8


[SPARK-21865][SQL] simplify the distribution semantic of Spark SQL

## What changes were proposed in this pull request?

**The current shuffle planning logic**

1. Each operator specifies the distribution requirements for its children, via 
the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` 
interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a 
`Distribution`.
4. For each operator, check each child of it, add a shuffle node above the 
child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings are 
compatible with each other, via the `Partitioning.compatibleWith`.
6. If the check in 5 failed, add a shuffle above each child.
7. try to eliminate the shuffles added in 6, via `Partitioning.guarantees`.

This design has a major problem with the definition of "compatible".

`Partitioning.compatibleWith` is not well defined, ideally a `Partitioning` 
can't know if it's compatible with other `Partitioning`, without more 
information from the operator. For example, `t1 join t2 on t1.a = t2.b`, 
`HashPartitioning(a, 10)` should be compatible with `HashPartitioning(b, 10)` 
under this case, but the partitioning itself doesn't know it.

As a result, currently `Partitioning.compatibleWith` always return false except 
for literals, which make it almost useless. This also means, if an operator has 
distribution requirements for multiple children, Spark always add shuffle nodes 
to all the children(although some of them can be eliminated). However, there is 
no guarantee that the children's output partitionings are compatible with each 
other after adding these shuffles, we just assume that the operator will only 
specify `ClusteredDistribution` for multiple children.

I think it's very hard to guarantee children co-partition for all kinds of 
operators, and we can not even give a clear definition about co-partition 
between distributions like `ClusteredDistribution(a,b)` and 
`ClusteredDistribution(c)`.

I think we should drop the "compatible" concept in the distribution model, and 
let the operator achieve the co-partition requirement by special distribution 
requirements.

**Proposed shuffle planning logic after this PR**
(The first 4 are same as before)
1. Each operator specifies the distribution requirements for its children, via 
the `Distribution` interface.
2. Each operator specifies its output partitioning, via the `Partitioning` 
interface.
3. `Partitioning.satisfy` determines whether a `Partitioning` can satisfy a 
`Distribution`.
4. For each operator, check each child of it, add a shuffle node above the 
child if the child partitioning can not satisfy the required distribution.
5. For each operator, check if its children's output partitionings have the 
same number of partitions.
6. If the check in 5 failed, pick the max number of partitions from children's 
output partitionings, and add shuffle to child whose number of partitions 
doesn't equal to the max one.

The new distribution model is very simple, we only have one kind of 
relationship, which is `Partitioning.satisfy`. For multiple children, Spark 
only guarantees they have the same number of partitions, and it's the 
operator's responsibility to leverage this guarantee to achieve more 
complicated requirements. For example, non-broadcast joins can use the newly 
added `HashPartitionedDistribution` to achieve co-partition.

## How was this patch tested?

existing tests.

Author: Wenchen Fan 

Closes #19080 from cloud-fan/exchange.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/eb45b52e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/eb45b52e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/eb45b52e

Branch: refs/heads/master
Commit: eb45b52e826ea9cea48629760db35ef87f91fea0
Parents: 2c73d2a
Author: Wenchen Fan 
Authored: Mon Jan 8 19:41:41 2018 +0800
Committer: Wenchen Fan 
Committed: Mon Jan 8 19:41:41 2018 +0800

--
 .../catalyst/plans/physical/partitioning.scala  | 286 +++
 .../spark/sql/catalyst/PartitioningSuite.scala  |  55 
 .../apache/spark/sql/execution/SparkPlan.scala  |  16 +-
 .../execution/exchange/EnsureRequirements.scala | 120 +++-
 .../execution/joins/ShuffledHashJoinExec.scala  |   2 +-
 .../sql/execution/joins/SortMergeJoinExec.scala |   2 +-
 .../apache/spark/sql/execution/objects.scala|   2 +-
 .../spark/sql/execution/PlannerSuite.scala  |  81 ++
 8 files changed, 194 insertions(+), 370 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/eb45b52e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physica

svn commit: r24067 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_02_01-6964dfe-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 10:15:29 2018
New Revision: 24067

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_02_01-6964dfe docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



svn commit: r24065 - in /dev/spark/2.3.0-SNAPSHOT-2018_01_08_00_01-8fdeb4b-docs: ./ _site/ _site/api/ _site/api/R/ _site/api/java/ _site/api/java/lib/ _site/api/java/org/ _site/api/java/org/apache/ _s

2018-01-08 Thread pwendell
Author: pwendell
Date: Mon Jan  8 08:16:44 2018
New Revision: 24065

Log:
Apache Spark 2.3.0-SNAPSHOT-2018_01_08_00_01-8fdeb4b docs


[This commit notification would consist of 1439 parts, 
which exceeds the limit of 50 ones, so it was shortened to the summary.]

-
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org



spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions

2018-01-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.2 41f705a57 -> 7c30ae39f


[SPARK-22983] Don't push filters beneath aggregates with empty grouping 
expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually 
returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters 
a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes 
the original filter and adds a new filter onto Aggregate's child, e.g. 
`Agg(Filter(...))`. This is sometimes okay, but the case above is a 
counterexample: because there is no explicit `GROUP BY`, we are implicitly 
computing a global aggregate over the entire table so the original filter was 
not acting like a `HAVING` clause filtering the number of groups: if we push 
this filter then it fails to actually reduce the cardinality of the Aggregate 
output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of 
data-independent filters (filters which reference no columns of the filtered 
relation). There was additional discussion after my fix was merged which 
pointed out that my patch was an incomplete fix (see #15289), but it looks I 
must have either misunderstood the comment or forgot to follow up on the 
additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases 
where there are no grouping expressions. Since there are no grouping keys, the 
only columns are aggregate columns and we can't push filters defined over 
aggregate results, so this change won't cause us to miss out on any legitimate 
pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen 

Closes #20180 from 
JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.

(cherry picked from commit 2c73d2a948bdde798aaf0f87c18846281deb05fd)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7c30ae39
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7c30ae39
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7c30ae39

Branch: refs/heads/branch-2.2
Commit: 7c30ae39f57ef0c42173b52aa405027b44e0ad9f
Parents: 41f705a
Author: Josh Rosen 
Authored: Mon Jan 8 16:04:03 2018 +0800
Committer: gatorsmile 
Committed: Mon Jan 8 16:05:04 2018 +0800

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala|  3 ++-
 .../catalyst/optimizer/FilterPushdownSuite.scala| 13 +
 .../test/resources/sql-tests/inputs/group-by.sql|  9 +
 .../resources/sql-tests/results/group-by.sql.out| 16 +++-
 4 files changed, 39 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 82bd759..fe66821 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -754,7 +754,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 case filter @ Filter(condition, aggregate: Aggregate)
-  if aggregate.aggregateExpressions.forall(_.deterministic) =>
+  if aggregate.aggregateExpressions.forall(_.deterministic)
+&& aggregate.groupingExpressions.nonEmpty =>
   // Find all the aliased expressions in the aggregate list that don't 
include any actual
   // AggregateExpression, and create a map from the alias to the expression
   val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/7c30ae39/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index d4d281e..4d41354 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apac

spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions

2018-01-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/branch-2.3 8bf24e9fe -> 6964dfe47


[SPARK-22983] Don't push filters beneath aggregates with empty grouping 
expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually 
returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters 
a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes 
the original filter and adds a new filter onto Aggregate's child, e.g. 
`Agg(Filter(...))`. This is sometimes okay, but the case above is a 
counterexample: because there is no explicit `GROUP BY`, we are implicitly 
computing a global aggregate over the entire table so the original filter was 
not acting like a `HAVING` clause filtering the number of groups: if we push 
this filter then it fails to actually reduce the cardinality of the Aggregate 
output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of 
data-independent filters (filters which reference no columns of the filtered 
relation). There was additional discussion after my fix was merged which 
pointed out that my patch was an incomplete fix (see #15289), but it looks I 
must have either misunderstood the comment or forgot to follow up on the 
additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases 
where there are no grouping expressions. Since there are no grouping keys, the 
only columns are aggregate columns and we can't push filters defined over 
aggregate results, so this change won't cause us to miss out on any legitimate 
pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen 

Closes #20180 from 
JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.

(cherry picked from commit 2c73d2a948bdde798aaf0f87c18846281deb05fd)
Signed-off-by: gatorsmile 


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6964dfe4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6964dfe4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6964dfe4

Branch: refs/heads/branch-2.3
Commit: 6964dfe47b2090e542b26cd64e27420ec3eb1a3d
Parents: 8bf24e9
Author: Josh Rosen 
Authored: Mon Jan 8 16:04:03 2018 +0800
Committer: gatorsmile 
Committed: Mon Jan 8 16:04:28 2018 +0800

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala|  3 ++-
 .../catalyst/optimizer/FilterPushdownSuite.scala| 13 +
 .../test/resources/sql-tests/inputs/group-by.sql|  9 +
 .../resources/sql-tests/results/group-by.sql.out| 16 +++-
 4 files changed, 39 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/6964dfe4/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0d4b02c..df0af82 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -795,7 +795,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 case filter @ Filter(condition, aggregate: Aggregate)
-  if aggregate.aggregateExpressions.forall(_.deterministic) =>
+  if aggregate.aggregateExpressions.forall(_.deterministic)
+&& aggregate.groupingExpressions.nonEmpty =>
   // Find all the aliased expressions in the aggregate list that don't 
include any actual
   // AggregateExpression, and create a map from the alias to the expression
   val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/6964dfe4/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 85a5e97..82a1025 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apac

spark git commit: [SPARK-22983] Don't push filters beneath aggregates with empty grouping expressions

2018-01-08 Thread lixiao
Repository: spark
Updated Branches:
  refs/heads/master 8fdeb4b99 -> 2c73d2a94


[SPARK-22983] Don't push filters beneath aggregates with empty grouping 
expressions

## What changes were proposed in this pull request?

The following SQL query should return zero rows, but in Spark it actually 
returns one row:

```
SELECT 1 from (
  SELECT 1 AS z,
  MIN(a.x)
  FROM (select 1 as x) a
  WHERE false
) b
where b.z != b.z
```

The problem stems from the `PushDownPredicate` rule: when this rule encounters 
a filter on top of an Aggregate operator, e.g. `Filter(Agg(...))`, it removes 
the original filter and adds a new filter onto Aggregate's child, e.g. 
`Agg(Filter(...))`. This is sometimes okay, but the case above is a 
counterexample: because there is no explicit `GROUP BY`, we are implicitly 
computing a global aggregate over the entire table so the original filter was 
not acting like a `HAVING` clause filtering the number of groups: if we push 
this filter then it fails to actually reduce the cardinality of the Aggregate 
output, leading to the wrong answer.

In 2016 I fixed a similar problem involving invalid pushdowns of 
data-independent filters (filters which reference no columns of the filtered 
relation). There was additional discussion after my fix was merged which 
pointed out that my patch was an incomplete fix (see #15289), but it looks I 
must have either misunderstood the comment or forgot to follow up on the 
additional points raised there.

This patch fixes the problem by choosing to never push down filters in cases 
where there are no grouping expressions. Since there are no grouping keys, the 
only columns are aggregate columns and we can't push filters defined over 
aggregate results, so this change won't cause us to miss out on any legitimate 
pushdown opportunities.

## How was this patch tested?

New regression tests in `SQLQueryTestSuite` and `FilterPushdownSuite`.

Author: Josh Rosen 

Closes #20180 from 
JoshRosen/SPARK-22983-dont-push-filters-beneath-aggs-with-empty-grouping-expressions.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/2c73d2a9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/2c73d2a9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/2c73d2a9

Branch: refs/heads/master
Commit: 2c73d2a948bdde798aaf0f87c18846281deb05fd
Parents: 8fdeb4b
Author: Josh Rosen 
Authored: Mon Jan 8 16:04:03 2018 +0800
Committer: gatorsmile 
Committed: Mon Jan 8 16:04:03 2018 +0800

--
 .../spark/sql/catalyst/optimizer/Optimizer.scala|  3 ++-
 .../catalyst/optimizer/FilterPushdownSuite.scala| 13 +
 .../test/resources/sql-tests/inputs/group-by.sql|  9 +
 .../resources/sql-tests/results/group-by.sql.out| 16 +++-
 4 files changed, 39 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/spark/blob/2c73d2a9/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
--
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index 0d4b02c..df0af82 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -795,7 +795,8 @@ object PushDownPredicate extends Rule[LogicalPlan] with 
PredicateHelper {
   project.copy(child = Filter(replaceAlias(condition, aliasMap), 
grandChild))
 
 case filter @ Filter(condition, aggregate: Aggregate)
-  if aggregate.aggregateExpressions.forall(_.deterministic) =>
+  if aggregate.aggregateExpressions.forall(_.deterministic)
+&& aggregate.groupingExpressions.nonEmpty =>
   // Find all the aliased expressions in the aggregate list that don't 
include any actual
   // AggregateExpression, and create a map from the alias to the expression
   val aliasMap = AttributeMap(aggregate.aggregateExpressions.collect {

http://git-wip-us.apache.org/repos/asf/spark/blob/2c73d2a9/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
--
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
index 85a5e97..82a1025 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/FilterPushdownSuite.scala
@@ -809,6 +809,19 @@ class FilterPushdownSuite