[spark] branch branch-2.3 updated: [SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop versions unsupported by Hive fork

2018-12-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new b4aeb81  [SPARK-26422][R] Support to disable Hive support in SparkR 
even for Hadoop versions unsupported by Hive fork
b4aeb81 is described below

commit b4aeb819163268f8d28723e763a9d26da37edc5e
Author: Hyukjin Kwon 
AuthorDate: Fri Dec 21 16:09:30 2018 +0800

[SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop 
versions unsupported by Hive fork

## What changes were proposed in this pull request?

Currently,  even if I explicitly disable Hive support in SparkR session as 
below:

```r
sparkSession <- sparkR.session("local[4]", "SparkR", 
Sys.getenv("SPARK_HOME"),
   enableHiveSupport = FALSE)
```

produces when the Hadoop version is not supported by our Hive fork:

```
java.lang.reflect.InvocationTargetException
...
Caused by: java.lang.IllegalArgumentException: Unrecognized Hadoop major 
version number: 3.1.1.3.1.0.0-78
at 
org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion(ShimLoader.java:174)
at 
org.apache.hadoop.hive.shims.ShimLoader.loadShims(ShimLoader.java:139)
at 
org.apache.hadoop.hive.shims.ShimLoader.getHadoopShims(ShimLoader.java:100)
at 
org.apache.hadoop.hive.conf.HiveConf$ConfVars.(HiveConf.java:368)
... 43 more
Error in handleErrors(returnStatus, conn) :
  java.lang.ExceptionInInitializerError
at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:105)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:193)
at 
org.apache.spark.sql.SparkSession$.hiveClassesArePresent(SparkSession.scala:1116)
at 
org.apache.spark.sql.api.r.SQLUtils$.getOrCreateSparkSession(SQLUtils.scala:52)
at 
org.apache.spark.sql.api.r.SQLUtils.getOrCreateSparkSession(SQLUtils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
```

The root cause is that:

```
SparkSession.hiveClassesArePresent
```

check if the class is loadable or not to check if that's in classpath but 
`org.apache.hadoop.hive.conf.HiveConf` has a check for Hadoop version as static 
logic which is executed right away. This throws an `IllegalArgumentException` 
and that's not caught:


https://github.com/apache/spark/blob/36edbac1c8337a4719f90e4abd58d38738b2e1fb/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L1113-L1121

So, currently, if users have a Hive built-in Spark with unsupported Hadoop 
version by our fork (namely 3+), there's no way to use SparkR even though it 
could work.

This PR just propose to change the order of bool comparison so that we can 
don't execute `SparkSession.hiveClassesArePresent` when:

  1. `enableHiveSupport` is explicitly disabled
  2. `spark.sql.catalogImplementation` is `in-memory`

so that we **only** check `SparkSession.hiveClassesArePresent` when Hive 
support is explicitly enabled by short circuiting.

## How was this patch tested?

It's difficult to write a test since we don't run tests against Hadoop 3 
yet. See https://github.com/apache/spark/pull/21588. Manually tested.

Closes #23356 from HyukjinKwon/SPARK-26422.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 305e9b5ad22b428501fd42d3730d73d2e09ad4c5)
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index af20764..4c71795 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -49,9 +49,17 @@ private[sql] object SQLUtils extends Logging {
   sparkConfigMap: JMap[Object, Object],
   enableHiveSupport: Boolean): SparkSession = {
 val spark =
-  if (SparkSession.hiveClassesArePresent && enableHiveSupport &&
+  if (enableHiveSupport &&
   jsc.sc.conf.get(CATALOG_IMPLEMENTATION.key, 
"hive").toLowerCase(Locale.ROOT) ==
-"hive") {
+"hive" &&
+  // Note that the order of conditions here are on purpose.
+  // `SparkSession.hiveClassesArePresent` checks if Hive's `HiveConf` 
is loadable or not;
+  // however, `HiveConf` itself has some static logic to check if 
Hadoop version is
+  // supported or not, which throws an `IllegalArgumentException` if 

[spark] branch branch-2.4 updated: [SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop versions unsupported by Hive fork

2018-12-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
 new 90a14d5  [SPARK-26422][R] Support to disable Hive support in SparkR 
even for Hadoop versions unsupported by Hive fork
90a14d5 is described below

commit 90a14d58b4e87a603e35a9ab679f6049b10e9c7b
Author: Hyukjin Kwon 
AuthorDate: Fri Dec 21 16:09:30 2018 +0800

[SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop 
versions unsupported by Hive fork

## What changes were proposed in this pull request?

Currently,  even if I explicitly disable Hive support in SparkR session as 
below:

```r
sparkSession <- sparkR.session("local[4]", "SparkR", 
Sys.getenv("SPARK_HOME"),
   enableHiveSupport = FALSE)
```

produces when the Hadoop version is not supported by our Hive fork:

```
java.lang.reflect.InvocationTargetException
...
Caused by: java.lang.IllegalArgumentException: Unrecognized Hadoop major 
version number: 3.1.1.3.1.0.0-78
at 
org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion(ShimLoader.java:174)
at 
org.apache.hadoop.hive.shims.ShimLoader.loadShims(ShimLoader.java:139)
at 
org.apache.hadoop.hive.shims.ShimLoader.getHadoopShims(ShimLoader.java:100)
at 
org.apache.hadoop.hive.conf.HiveConf$ConfVars.(HiveConf.java:368)
... 43 more
Error in handleErrors(returnStatus, conn) :
  java.lang.ExceptionInInitializerError
at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:105)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:193)
at 
org.apache.spark.sql.SparkSession$.hiveClassesArePresent(SparkSession.scala:1116)
at 
org.apache.spark.sql.api.r.SQLUtils$.getOrCreateSparkSession(SQLUtils.scala:52)
at 
org.apache.spark.sql.api.r.SQLUtils.getOrCreateSparkSession(SQLUtils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
```

The root cause is that:

```
SparkSession.hiveClassesArePresent
```

check if the class is loadable or not to check if that's in classpath but 
`org.apache.hadoop.hive.conf.HiveConf` has a check for Hadoop version as static 
logic which is executed right away. This throws an `IllegalArgumentException` 
and that's not caught:


https://github.com/apache/spark/blob/36edbac1c8337a4719f90e4abd58d38738b2e1fb/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L1113-L1121

So, currently, if users have a Hive built-in Spark with unsupported Hadoop 
version by our fork (namely 3+), there's no way to use SparkR even though it 
could work.

This PR just propose to change the order of bool comparison so that we can 
don't execute `SparkSession.hiveClassesArePresent` when:

  1. `enableHiveSupport` is explicitly disabled
  2. `spark.sql.catalogImplementation` is `in-memory`

so that we **only** check `SparkSession.hiveClassesArePresent` when Hive 
support is explicitly enabled by short circuiting.

## How was this patch tested?

It's difficult to write a test since we don't run tests against Hadoop 3 
yet. See https://github.com/apache/spark/pull/21588. Manually tested.

Closes #23356 from HyukjinKwon/SPARK-26422.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
(cherry picked from commit 305e9b5ad22b428501fd42d3730d73d2e09ad4c5)
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index af20764..4c71795 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -49,9 +49,17 @@ private[sql] object SQLUtils extends Logging {
   sparkConfigMap: JMap[Object, Object],
   enableHiveSupport: Boolean): SparkSession = {
 val spark =
-  if (SparkSession.hiveClassesArePresent && enableHiveSupport &&
+  if (enableHiveSupport &&
   jsc.sc.conf.get(CATALOG_IMPLEMENTATION.key, 
"hive").toLowerCase(Locale.ROOT) ==
-"hive") {
+"hive" &&
+  // Note that the order of conditions here are on purpose.
+  // `SparkSession.hiveClassesArePresent` checks if Hive's `HiveConf` 
is loadable or not;
+  // however, `HiveConf` itself has some static logic to check if 
Hadoop version is
+  // supported or not, which throws an `IllegalArgumentException` if 

[spark] branch master updated: [SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop versions unsupported by Hive fork

2018-12-21 Thread gurwls223
This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 305e9b5  [SPARK-26422][R] Support to disable Hive support in SparkR 
even for Hadoop versions unsupported by Hive fork
305e9b5 is described below

commit 305e9b5ad22b428501fd42d3730d73d2e09ad4c5
Author: Hyukjin Kwon 
AuthorDate: Fri Dec 21 16:09:30 2018 +0800

[SPARK-26422][R] Support to disable Hive support in SparkR even for Hadoop 
versions unsupported by Hive fork

## What changes were proposed in this pull request?

Currently,  even if I explicitly disable Hive support in SparkR session as 
below:

```r
sparkSession <- sparkR.session("local[4]", "SparkR", 
Sys.getenv("SPARK_HOME"),
   enableHiveSupport = FALSE)
```

produces when the Hadoop version is not supported by our Hive fork:

```
java.lang.reflect.InvocationTargetException
...
Caused by: java.lang.IllegalArgumentException: Unrecognized Hadoop major 
version number: 3.1.1.3.1.0.0-78
at 
org.apache.hadoop.hive.shims.ShimLoader.getMajorVersion(ShimLoader.java:174)
at 
org.apache.hadoop.hive.shims.ShimLoader.loadShims(ShimLoader.java:139)
at 
org.apache.hadoop.hive.shims.ShimLoader.getHadoopShims(ShimLoader.java:100)
at 
org.apache.hadoop.hive.conf.HiveConf$ConfVars.(HiveConf.java:368)
... 43 more
Error in handleErrors(returnStatus, conn) :
  java.lang.ExceptionInInitializerError
at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:105)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:348)
at org.apache.spark.util.Utils$.classForName(Utils.scala:193)
at 
org.apache.spark.sql.SparkSession$.hiveClassesArePresent(SparkSession.scala:1116)
at 
org.apache.spark.sql.api.r.SQLUtils$.getOrCreateSparkSession(SQLUtils.scala:52)
at 
org.apache.spark.sql.api.r.SQLUtils.getOrCreateSparkSession(SQLUtils.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
```

The root cause is that:

```
SparkSession.hiveClassesArePresent
```

check if the class is loadable or not to check if that's in classpath but 
`org.apache.hadoop.hive.conf.HiveConf` has a check for Hadoop version as static 
logic which is executed right away. This throws an `IllegalArgumentException` 
and that's not caught:


https://github.com/apache/spark/blob/36edbac1c8337a4719f90e4abd58d38738b2e1fb/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala#L1113-L1121

So, currently, if users have a Hive built-in Spark with unsupported Hadoop 
version by our fork (namely 3+), there's no way to use SparkR even though it 
could work.

This PR just propose to change the order of bool comparison so that we can 
don't execute `SparkSession.hiveClassesArePresent` when:

  1. `enableHiveSupport` is explicitly disabled
  2. `spark.sql.catalogImplementation` is `in-memory`

so that we **only** check `SparkSession.hiveClassesArePresent` when Hive 
support is explicitly enabled by short circuiting.

## How was this patch tested?

It's difficult to write a test since we don't run tests against Hadoop 3 
yet. See https://github.com/apache/spark/pull/21588. Manually tested.

Closes #23356 from HyukjinKwon/SPARK-26422.

Authored-by: Hyukjin Kwon 
Signed-off-by: Hyukjin Kwon 
---
 .../src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
index becb05c..e98cab8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/api/r/SQLUtils.scala
@@ -49,9 +49,17 @@ private[sql] object SQLUtils extends Logging {
   sparkConfigMap: JMap[Object, Object],
   enableHiveSupport: Boolean): SparkSession = {
 val spark =
-  if (SparkSession.hiveClassesArePresent && enableHiveSupport &&
+  if (enableHiveSupport &&
   jsc.sc.conf.get(CATALOG_IMPLEMENTATION.key, 
"hive").toLowerCase(Locale.ROOT) ==
-"hive") {
+"hive" &&
+  // Note that the order of conditions here are on purpose.
+  // `SparkSession.hiveClassesArePresent` checks if Hive's `HiveConf` 
is loadable or not;
+  // however, `HiveConf` itself has some static logic to check if 
Hadoop version is
+  // supported or not, which throws an `IllegalArgumentException` if 
unsupported.
+  // If this is checked first, there's no way to disable Hive support 
in the case above.
+  // 

[spark] branch branch-2.3 updated: [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False

2018-12-21 Thread lixiao
This is an automated email from the ASF dual-hosted git repository.

lixiao pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new a7d50ae  [SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter 
should consider NULL as False
a7d50ae is described below

commit a7d50ae24a5f92e8d9b6622436f0bb4c2e06cbe1
Author: Marco Gaido 
AuthorDate: Fri Dec 21 14:52:29 2018 -0800

[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider 
NULL as False

## What changes were proposed in this pull request?

In `ReplaceExceptWithFilter` we do not consider properly the case in which 
the condition returns NULL. Indeed, in that case, since negating NULL still 
returns NULL, so it is not true the assumption that negating the condition 
returns all the rows which didn't satisfy it, rows returning NULL may not be 
returned. This happens when constraints inferred by 
`InferFiltersFromConstraints` are not enough, as it happens with `OR` 
conditions.

The rule had also problems with non-deterministic conditions: in such a 
scenario, this rule would change the probability of the output.

The PR fixes these problem by:
 - returning False for the condition when it is Null (in this way we do 
return all the rows which didn't satisfy it);
 - avoiding any transformation when the condition is non-deterministic.

## How was this patch tested?

added UTs

Closes #23350 from mgaido91/SPARK-26366_2.3.

Authored-by: Marco Gaido 
Signed-off-by: gatorsmile 
---
 .../optimizer/ReplaceExceptWithFilter.scala| 32 +---
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 44 --
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 11 ++
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 38 +++
 4 files changed, 101 insertions(+), 24 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 45edf26..08cf160 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,7 +36,8 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
+ * 2. Update the attribute references to the left node;
+ * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -47,23 +48,28 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right) if isEligible(left, right) =>
-val newCondition = transformCondition(left, skipProject(right))
-newCondition.map { c =>
-  Distinct(Filter(Not(c), left))
-}.getOrElse {
+val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
+if (filterCondition.deterministic) {
+  transformCondition(left, filterCondition).map { c =>
+Distinct(Filter(Not(c), left))
+  }.getOrElse {
+e
+  }
+} else {
   e
 }
 }
   }
 
-  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
-val filterCondition =
-  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
-
-val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
-
-if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
-  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
+  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
+val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
+if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
+  val rewrittenCondition = condition.transform {
+case a: AttributeReference => attributeNameMap(a.name)
+  }
+  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
+  // rows containing NULL which are instead filtered in the Except right 
plan
+  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 

[spark] branch master updated: [SPARK-26216][SQL][FOLLOWUP] use abstract class instead of trait for UserDefinedFunction

2018-12-21 Thread wenchen
This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new bba506f  [SPARK-26216][SQL][FOLLOWUP] use abstract class instead of 
trait for UserDefinedFunction
bba506f is described below

commit bba506f8f454c7a8fa82e93a1728e02428fe0d35
Author: Wenchen Fan 
AuthorDate: Sat Dec 22 10:16:27 2018 +0800

[SPARK-26216][SQL][FOLLOWUP] use abstract class instead of trait for 
UserDefinedFunction

## What changes were proposed in this pull request?

A followup of https://github.com/apache/spark/pull/23178 , to keep binary 
compability by using abstract class.

## How was this patch tested?

Manual test. I created a simple app with Spark 2.4
```
object TryUDF {
  def main(args: Array[String]): Unit = {
val spark = 
SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
val f1 = udf((i: Int) => i + 1)
println(f1.deterministic)
spark.range(10).select(f1.asNonNullable().apply($"id")).show()
spark.stop()
  }
}
```

When I run it with current master, it fails with
```
java.lang.IncompatibleClassChangeError: Found interface 
org.apache.spark.sql.expressions.UserDefinedFunction, but class was expected
```

When I run it with this PR, it works

Closes #23351 from cloud-fan/minor.

Authored-by: Wenchen Fan 
Signed-off-by: Wenchen Fan 
---
 docs/sql-migration-guide-upgrade.md|  2 --
 project/MimaExcludes.scala | 28 +-
 .../sql/expressions/UserDefinedFunction.scala  |  2 +-
 3 files changed, 28 insertions(+), 4 deletions(-)

diff --git a/docs/sql-migration-guide-upgrade.md 
b/docs/sql-migration-guide-upgrade.md
index 115fc65..1bd3b5a 100644
--- a/docs/sql-migration-guide-upgrade.md
+++ b/docs/sql-migration-guide-upgrade.md
@@ -33,8 +33,6 @@ displayTitle: Spark SQL Upgrading Guide
 
   - In Spark version 2.4 and earlier, the `SET` command works without any 
warnings even if the specified key is for `SparkConf` entries and it has no 
effect because the command does not update `SparkConf`, but the behavior might 
confuse users. Since 3.0, the command fails if a `SparkConf` key is used. You 
can disable such a check by setting 
`spark.sql.legacy.setCommandRejectsSparkCoreConfs` to `false`.
 
-  - Spark applications which are built with Spark version 2.4 and prior, and 
call methods of `UserDefinedFunction`, need to be re-compiled with Spark 3.0, 
as they are not binary compatible with Spark 3.0.
-
   - Since Spark 3.0, CSV/JSON datasources use java.time API for parsing and 
generating CSV/JSON content. In Spark version 2.4 and earlier, 
java.text.SimpleDateFormat is used for the same purpuse with fallbacks to the 
parsing mechanisms of Spark 2.0 and 1.x. For example, `2018-12-08 10:39:21.123` 
with the pattern `-MM-dd'T'HH:mm:ss.SSS` cannot be parsed since Spark 3.0 
because the timestamp does not match to the pattern but it can be parsed by 
earlier Spark versions due to a fallback  [...]
 
   - In Spark version 2.4 and earlier, CSV datasource converts a malformed CSV 
string to a row with all `null`s in the PERMISSIVE mode. Since Spark 3.0, the 
returned row can contain non-`null` fields if some of CSV column values were 
parsed and converted to desired types successfully.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index 7bb70a2..89fc53c 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -241,7 +241,33 @@ object MimaExcludes {
 
 // [SPARK-26216][SQL] Do not use case class as public API 
(UserDefinedFunction)
 
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction$"),
-
ProblemFilters.exclude[IncompatibleTemplateDefProblem]("org.apache.spark.sql.expressions.UserDefinedFunction")
+
ProblemFilters.exclude[AbstractClassProblem]("org.apache.spark.sql.expressions.UserDefinedFunction"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.inputTypes"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.nullableTypes_="),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.dataType"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.f"),
+
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.this"),
+
ProblemFilters.exclude[DirectAbstractMethodProblem]("org.apache.spark.sql.expressions.UserDefinedFunction.asNonNullable"),
+

[spark] branch branch-2.3 updated: Revert "[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should consider NULL as False"

2018-12-21 Thread dongjoon
This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-2.3
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.3 by this push:
 new d9d3bea  Revert "[SPARK-26366][SQL][BACKPORT-2.3] 
ReplaceExceptWithFilter should consider NULL as False"
d9d3bea is described below

commit d9d3beafad8dbee5d21f062a181343b8640d2ccd
Author: Dongjoon Hyun 
AuthorDate: Fri Dec 21 19:57:07 2018 -0800

Revert "[SPARK-26366][SQL][BACKPORT-2.3] ReplaceExceptWithFilter should 
consider NULL as False"

This reverts commit a7d50ae24a5f92e8d9b6622436f0bb4c2e06cbe1.
---
 .../optimizer/ReplaceExceptWithFilter.scala| 32 +++-
 .../catalyst/optimizer/ReplaceOperatorSuite.scala  | 44 ++
 .../scala/org/apache/spark/sql/DatasetSuite.scala  | 11 --
 .../scala/org/apache/spark/sql/SQLQuerySuite.scala | 38 ---
 4 files changed, 24 insertions(+), 101 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
index 08cf160..45edf26 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceExceptWithFilter.scala
@@ -36,8 +36,7 @@ import org.apache.spark.sql.catalyst.rules.Rule
  * Note:
  * Before flipping the filter condition of the right node, we should:
  * 1. Combine all it's [[Filter]].
- * 2. Update the attribute references to the left node;
- * 3. Add a Coalesce(condition, False) (to take into account of NULL values in 
the condition).
+ * 2. Apply InferFiltersFromConstraints rule (to take into account of NULL 
values in the condition).
  */
 object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
@@ -48,28 +47,23 @@ object ReplaceExceptWithFilter extends Rule[LogicalPlan] {
 
 plan.transform {
   case e @ Except(left, right) if isEligible(left, right) =>
-val filterCondition = 
combineFilters(skipProject(right)).asInstanceOf[Filter].condition
-if (filterCondition.deterministic) {
-  transformCondition(left, filterCondition).map { c =>
-Distinct(Filter(Not(c), left))
-  }.getOrElse {
-e
-  }
-} else {
+val newCondition = transformCondition(left, skipProject(right))
+newCondition.map { c =>
+  Distinct(Filter(Not(c), left))
+}.getOrElse {
   e
 }
 }
   }
 
-  private def transformCondition(plan: LogicalPlan, condition: Expression): 
Option[Expression] = {
-val attributeNameMap: Map[String, Attribute] = plan.output.map(x => 
(x.name, x)).toMap
-if (condition.references.forall(r => attributeNameMap.contains(r.name))) {
-  val rewrittenCondition = condition.transform {
-case a: AttributeReference => attributeNameMap(a.name)
-  }
-  // We need to consider as False when the condition is NULL, otherwise we 
do not return those
-  // rows containing NULL which are instead filtered in the Except right 
plan
-  Some(Coalesce(Seq(rewrittenCondition, Literal.FalseLiteral)))
+  private def transformCondition(left: LogicalPlan, right: LogicalPlan): 
Option[Expression] = {
+val filterCondition =
+  
InferFiltersFromConstraints(combineFilters(right)).asInstanceOf[Filter].condition
+
+val attributeNameMap: Map[String, Attribute] = left.output.map(x => 
(x.name, x)).toMap
+
+if (filterCondition.references.forall(r => 
attributeNameMap.contains(r.name))) {
+  Some(filterCondition.transform { case a: AttributeReference => 
attributeNameMap(a.name) })
 } else {
   None
 }
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
index 78d3969..52dc2e9 100644
--- 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/ReplaceOperatorSuite.scala
@@ -20,12 +20,11 @@ package org.apache.spark.sql.catalyst.optimizer
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.dsl.expressions._
 import org.apache.spark.sql.catalyst.dsl.plans._
-import org.apache.spark.sql.catalyst.expressions.{Alias, Coalesce, If, 
Literal, Not}
+import org.apache.spark.sql.catalyst.expressions.{Alias, Literal, Not}
 import org.apache.spark.sql.catalyst.expressions.aggregate.First
 import org.apache.spark.sql.catalyst.plans.{LeftAnti, LeftSemi, PlanTest}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
-import org.apache.spark.sql.types.BooleanType
 
 class 

[spark] branch master updated: [SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka

2018-12-21 Thread zsxwing
This is an automated email from the ASF dual-hosted git repository.

zsxwing pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8e76d66  [SPARK-26267][SS] Retry when detecting incorrect offsets from 
Kafka
8e76d66 is described below

commit 8e76d6621aaddb8b73443b14ea2c6eebe9089893
Author: Shixiong Zhu 
AuthorDate: Fri Dec 21 10:41:25 2018 -0800

[SPARK-26267][SS] Retry when detecting incorrect offsets from Kafka

## What changes were proposed in this pull request?

Due to [KAFKA-7703](https://issues.apache.org/jira/browse/KAFKA-7703), 
Kafka may return an earliest offset when we are request a latest offset. This 
will cause Spark to reprocess data.

As per suggestion in KAFKA-7703, we put a position call between poll and 
seekToEnd to block the fetch request triggered by `poll` before calling 
`seekToEnd`.

In addition, to avoid other unknown issues, we also use the previous known 
offsets to audit the latest offsets returned by Kafka. If we find some 
incorrect offsets (a latest offset is less than an offset in `knownOffsets`), 
we will retry at most `maxOffsetFetchAttempts` times.

## How was this patch tested?

Jenkins

Closes #23324 from zsxwing/SPARK-26267.

Authored-by: Shixiong Zhu 
Signed-off-by: Shixiong Zhu 
---
 .../sql/kafka010/KafkaContinuousReadSupport.scala  |  4 +-
 .../sql/kafka010/KafkaMicroBatchReadSupport.scala  | 19 +++--
 .../sql/kafka010/KafkaOffsetRangeCalculator.scala  |  2 +
 .../spark/sql/kafka010/KafkaOffsetReader.scala | 80 --
 .../apache/spark/sql/kafka010/KafkaSource.scala|  5 +-
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 48 +
 6 files changed, 145 insertions(+), 13 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
index 1753a28..02dfb9c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReadSupport.scala
@@ -60,7 +60,7 @@ class KafkaContinuousReadSupport(
   override def initialOffset(): Offset = {
 val offsets = initialOffsets match {
   case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
-  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+  case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets(None))
   case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, 
reportDataLoss)
 }
 logInfo(s"Initial offsets: $offsets")
@@ -107,7 +107,7 @@ class KafkaContinuousReadSupport(
 
   override def needsReconfiguration(config: ScanConfig): Boolean = {
 val knownPartitions = 
config.asInstanceOf[KafkaContinuousScanConfig].knownPartitions
-offsetReader.fetchLatestOffsets().keySet != knownPartitions
+offsetReader.fetchLatestOffsets(None).keySet != knownPartitions
   }
 
   override def toString(): String = s"KafkaSource[$offsetReader]"
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
index bb4de67..b4f042e 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReadSupport.scala
@@ -84,7 +84,7 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 
   override def latestOffset(start: Offset): Offset = {
 val startPartitionOffsets = 
start.asInstanceOf[KafkaSourceOffset].partitionToOffsets
-val latestPartitionOffsets = kafkaOffsetReader.fetchLatestOffsets()
+val latestPartitionOffsets = 
kafkaOffsetReader.fetchLatestOffsets(Some(startPartitionOffsets))
 endPartitionOffsets = KafkaSourceOffset(maxOffsetsPerTrigger.map { 
maxOffsets =>
   rateLimit(maxOffsets, startPartitionOffsets, latestPartitionOffsets)
 }.getOrElse {
@@ -133,10 +133,21 @@ private[kafka010] class KafkaMicroBatchReadSupport(
 }.toSeq
 logDebug("TopicPartitions: " + topicPartitions.mkString(", "))
 
+val fromOffsets = startPartitionOffsets ++ newPartitionInitialOffsets
+val untilOffsets = endPartitionOffsets
+untilOffsets.foreach { case (tp, untilOffset) =>
+  fromOffsets.get(tp).foreach { fromOffset =>
+if (untilOffset < fromOffset) {
+  reportDataLoss(s"Partition $tp's offset was changed from " +
+s"$fromOffset to $untilOffset, some data may have been missed")
+}
+  }
+}
+
 

[spark] branch master updated: [SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with yarn to maxmize use of cluster resource

2018-12-21 Thread tgraves
This is an automated email from the ASF dual-hosted git repository.

tgraves pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new d6a5f85  [SPARK-26269][YARN] Yarnallocator should have same blacklist 
behaviour with yarn to maxmize use of cluster resource
d6a5f85 is described below

commit d6a5f859848bbd237e19075dd26e1547fb3af417
Author: wuyi 
AuthorDate: Fri Dec 21 13:21:58 2018 -0600

[SPARK-26269][YARN] Yarnallocator should have same blacklist behaviour with 
yarn to maxmize use of cluster resource

## What changes were proposed in this pull request?

As I mentioned in jira 
[SPARK-26269](https://issues.apache.org/jira/browse/SPARK-26269), in order to 
maxmize the use of cluster resource,  this pr try to make `YarnAllocator` have 
the same blacklist behaviour with YARN.

## How was this patch tested?

Added.

Closes #23223 from 
Ngone51/dev-YarnAllocator-should-have-same-blacklist-behaviour-with-YARN.

Lead-authored-by: wuyi 
Co-authored-by: Ngone51 
Signed-off-by: Thomas Graves 
---
 .../apache/spark/deploy/yarn/YarnAllocator.scala   | 32 +++--
 .../yarn/YarnAllocatorBlacklistTracker.scala   |  4 +-
 .../yarn/YarnAllocatorBlacklistTrackerSuite.scala  |  2 +-
 .../spark/deploy/yarn/YarnAllocatorSuite.scala | 75 +-
 4 files changed, 101 insertions(+), 12 deletions(-)

diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 54b1ec2..a3feca5 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -607,13 +607,23 @@ private[yarn] class YarnAllocator(
 val message = "Container killed by YARN for exceeding physical 
memory limits. " +
   s"$diag Consider boosting ${EXECUTOR_MEMORY_OVERHEAD.key}."
 (true, message)
-  case _ =>
-// all the failures which not covered above, like:
-// disk failure, kill by app master or resource manager, ...
-allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
-(true, "Container marked as failed: " + containerId + onHostStr +
-  ". Exit status: " + completedContainer.getExitStatus +
-  ". Diagnostics: " + completedContainer.getDiagnostics)
+  case other_exit_status =>
+// SPARK-26269: follow YARN's blacklisting behaviour(see 
https://github
+// 
.com/apache/hadoop/blob/228156cfd1b474988bc4fedfbf7edddc87db41e3/had
+// 
oop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/ap
+// ache/hadoop/yarn/util/Apps.java#L273 for details)
+if 
(NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS.contains(other_exit_status)) {
+  (false, s"Container marked as failed: $containerId$onHostStr" +
+s". Exit status: ${completedContainer.getExitStatus}" +
+s". Diagnostics: ${completedContainer.getDiagnostics}.")
+} else {
+  // completed container from a bad node
+  
allocatorBlacklistTracker.handleResourceAllocationFailure(hostOpt)
+  (true, s"Container from a bad node: $containerId$onHostStr" +
+s". Exit status: ${completedContainer.getExitStatus}" +
+s". Diagnostics: ${completedContainer.getDiagnostics}.")
+}
+
 
 }
 if (exitCausedByApp) {
@@ -739,4 +749,12 @@ private object YarnAllocator {
   val MEM_REGEX = "[0-9.]+ [KMG]B"
   val VMEM_EXCEEDED_EXIT_CODE = -103
   val PMEM_EXCEEDED_EXIT_CODE = -104
+
+  val NOT_APP_AND_SYSTEM_FAULT_EXIT_STATUS = Set(
+ContainerExitStatus.KILLED_BY_RESOURCEMANAGER,
+ContainerExitStatus.KILLED_BY_APPMASTER,
+ContainerExitStatus.KILLED_AFTER_APP_COMPLETION,
+ContainerExitStatus.ABORTED,
+ContainerExitStatus.DISKS_FAILED
+  )
 }
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
index ceac7cd..268976b 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocatorBlacklistTracker.scala
@@ -120,7 +120,9 @@ private[spark] class YarnAllocatorBlacklistTracker(
 if (removals.nonEmpty) {
   logInfo(s"removing nodes from YARN application master's blacklist: 
$removals")
 }
-amClient.updateBlacklist(additions.asJava, removals.asJava)
+if (additions.nonEmpty || removals.nonEmpty) {
+  

[spark] branch master updated: [SPARK-25642][YARN] Adding two new metrics to record the number of registered connections as well as the number of active connections to YARN Shuffle Service

2018-12-21 Thread vanzin
This is an automated email from the ASF dual-hosted git repository.

vanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
 new 8dd29fe  [SPARK-25642][YARN] Adding two new metrics to record the 
number of registered connections as well as the number of active connections to 
YARN Shuffle Service
8dd29fe is described below

commit 8dd29fe36b781d115213b1d6a8446ad04e9239bb
Author: pgandhi 
AuthorDate: Fri Dec 21 11:28:22 2018 -0800

[SPARK-25642][YARN] Adding two new metrics to record the number of 
registered connections as well as the number of active connections to YARN 
Shuffle Service

Recently, the ability to expose the metrics for YARN Shuffle Service was 
added as part of [SPARK-18364](https://github.com/apache/spark/pull/22485). We 
need to add some metrics to be able to determine the number of active 
connections as well as open connections to the external shuffle service to 
benchmark network and connection issues on large cluster environments.

Added two more shuffle server metrics for Spark Yarn shuffle service: 
numRegisteredConnections which indicate the number of registered connections to 
the shuffle service and numActiveConnections which indicate the number of 
active connections to the shuffle service at any given point in time.

If these metrics are outputted to a file, we get something like this:

1533674653489 default.shuffleService: Hostname=server1.abc.com, 
openBlockRequestLatencyMillis_count=729, 
openBlockRequestLatencyMillis_rate15=0.7110833548897356, 
openBlockRequestLatencyMillis_rate5=1.657808981793011, 
openBlockRequestLatencyMillis_rate1=2.2404486061620474, 
openBlockRequestLatencyMillis_rateMean=0.9242558551196706,
numRegisteredConnections=35,
blockTransferRateBytes_count=2635880512, 
blockTransferRateBytes_rate15=2578547.6094160094, 
blockTransferRateBytes_rate5=6048721.726302424, 
blockTransferRateBytes_rate1=8548922.518223226, 
blockTransferRateBytes_rateMean=3341878.633637769, registeredExecutorsSize=5, 
registerExecutorRequestLatencyMillis_count=5, 
registerExecutorRequestLatencyMillis_rate15=0.0027973949328659836, 
registerExecutorRequestLatencyMillis_rate5=0.0021278007987206426, 
registerExecutorRequestLatencyMillis_rate1=2. [...]

Closes #22498 from pgandhi999/SPARK-18364.

Authored-by: pgandhi 
Signed-off-by: Marcelo Vanzin 
---
 .../org/apache/spark/network/TransportContext.java |  9 +++-
 .../network/server/TransportChannelHandler.java| 18 +++-
 .../spark/network/server/TransportServer.java  |  5 +
 .../shuffle/ExternalShuffleBlockHandler.java   | 24 --
 .../spark/network/yarn/YarnShuffleService.java | 21 +++
 .../network/yarn/YarnShuffleServiceMetrics.java|  5 +
 .../spark/deploy/ExternalShuffleService.scala  |  2 ++
 .../yarn/YarnShuffleServiceMetricsSuite.scala  |  3 ++-
 8 files changed, 73 insertions(+), 14 deletions(-)

diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
index 480b526..1a3f3f2 100644
--- 
a/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
+++ 
b/common/network-common/src/main/java/org/apache/spark/network/TransportContext.java
@@ -20,6 +20,7 @@ package org.apache.spark.network;
 import java.util.ArrayList;
 import java.util.List;
 
+import com.codahale.metrics.Counter;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.EventLoopGroup;
@@ -66,6 +67,8 @@ public class TransportContext {
   private final RpcHandler rpcHandler;
   private final boolean closeIdleConnections;
   private final boolean isClientOnly;
+  // Number of registered connections to the shuffle service
+  private Counter registeredConnections = new Counter();
 
   /**
* Force to create MessageEncoder and MessageDecoder so that we can make 
sure they will be created
@@ -221,7 +224,7 @@ public class TransportContext {
 TransportRequestHandler requestHandler = new 
TransportRequestHandler(channel, client,
   rpcHandler, conf.maxChunksBeingTransferred());
 return new TransportChannelHandler(client, responseHandler, requestHandler,
-  conf.connectionTimeoutMs(), closeIdleConnections);
+  conf.connectionTimeoutMs(), closeIdleConnections, this);
   }
 
   /**
@@ -234,4 +237,8 @@ public class TransportContext {
   }
 
   public TransportConf getConf() { return conf; }
+
+  public Counter getRegisteredConnections() {
+return registeredConnections;
+  }
 }
diff --git 
a/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
 
b/common/network-common/src/main/java/org/apache/spark/network/server/TransportChannelHandler.java
index