[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/6
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/6
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95671/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/6
  
**[Test build #95671 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95671/testReport)**
 for PR 6 at commit 
[`90c9687`](https://github.com/apache/spark/commit/90c968772d74c8aadc7a4d0e74e226554b921486).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread dilipbiswal
Github user dilipbiswal commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214962083
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
--- End diff --

@mgaido91 
> This is, indeed, arguable. I think that letting the user choose is a good 
idea. If the users runs the query and gets an AnalysisException because he/she 
is trying to perform a cartesian product, he/she can decide: ok, I am doing a 
wrong thing, let's change it; or he/she can say, well, one of my 2 tables 
involved contains 10 rows, not a big deal, I want to perform it nonetheless, 
let's set spark.sql.crossJoin.enabled=true and run it.

Sounds reasonable ..


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22282
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22282
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95672/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22282
  
**[Test build #95672 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)**
 for PR 22282 at commit 
[`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22282
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95670/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22282
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22282
  
**[Test build #95670 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)**
 for PR 22282 at commit 
[`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22309: [SPARK-20384][CORE] Support value class in schema of Dat...

2018-09-04 Thread mt40
Github user mt40 commented on the issue:

https://github.com/apache/spark/pull/22309
  
@cloud-fan @liancheng @marmbrus could you please take a look at this and 
start the tests?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18906
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95677/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/18906
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18906
  
**[Test build #95677 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95677/testReport)**
 for PR 18906 at commit 
[`dcf3f07`](https://github.com/apache/spark/commit/dcf3f0767df1801a3bb6f679226fda18a69c72aa).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22228: [SPARK-25124][ML]VectorSizeHint setSize and getSize don'...

2018-09-04 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/8
  
This is already merged, @huaxingao Could you please close this PR?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...

2018-09-04 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/1
  
+1 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...

2018-09-04 Thread icexelloss
Github user icexelloss commented on a diff in the pull request:

https://github.com/apache/spark/pull/22329#discussion_r214940744
  
--- Diff: python/pyspark/sql/functions.py ---
@@ -2804,6 +2804,20 @@ def pandas_udf(f=None, returnType=None, 
functionType=None):
|  1|1.5|
|  2|6.0|
+---+---+
+   >>> @pandas_udf("id long, v1 double, v2 double", 
PandasUDFType.GROUPED_MAP)  # doctest: +SKIP
--- End diff --

It took me a while to realize `v1` is a grouping key. It also a bit 
uncommon to use double value as a grouping key . How about we do sth like?

`id long, additional_key long, v double`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...

2018-09-04 Thread tgravescs
Github user tgravescs commented on a diff in the pull request:

https://github.com/apache/spark/pull/1#discussion_r214937032
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala ---
@@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl(
   accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
   blockManagerId: BlockManagerId): Boolean = {
 // (taskId, stageId, stageAttemptId, accumUpdates)
-val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = synchronized {
+val accumUpdatesWithTaskIds: Array[(Long, Int, Int, 
Seq[AccumulableInfo])] = {
   accumUpdates.flatMap { case (id, updates) =>
 val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), 
None))
-taskIdToTaskSetManager.get(id).map { taskSetMgr =>
+Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr =>
--- End diff --

I agree this could happen, but it shouldn't cause issues because before 
this change the executor could have been removed right before this function was 
called (its all timing dependent), so that does not change this functionality.  
This is only to update accumulators for running tasks.  If the tasks had 
finished then the accumulator updates would have been processed via the task 
end events.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22331
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22331
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22331
  
Can one of the admins verify this patch?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22331: Tests for idempotency of FileStreamSink - Work in...

2018-09-04 Thread misutoth
GitHub user misutoth opened a pull request:

https://github.com/apache/spark/pull/22331

Tests for idempotency of FileStreamSink - Work in Progress

## What changes were proposed in this pull request?

Reproduce File Sink duplication in driver failure scenario to help 
understanding the situation.

## How was this patch tested?
This is a test addition only that was run and the last 2 tests failed 
showing there is a problem.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/misutoth/spark file-sink-dupe

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22331.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22331


commit 0a5c6c45a4b90fc2ea8bd2647b6d3d3dfd8bd1a4
Author: Mihaly Toth 
Date:   2018-09-03T11:47:52Z

Tests for idempotency of FileStreamSink




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/18906
  
**[Test build #95677 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95677/testReport)**
 for PR 18906 at commit 
[`dcf3f07`](https://github.com/apache/spark/commit/dcf3f0767df1801a3bb6f679226fda18a69c72aa).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933474
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
+  }
+} else {
+  joinType
+}
+
+  val join = Join(newLeft, newRight, newJoinType, newJoinCond)
+  if (others.nonEmpty) {
+Filter(others.reduceLeft(And), join)
--- End diff --

as pointed out by @dilipbiswal, this is correct only in the case of 
InnerJoin


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933787
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
+  " join to cross join by setting the configuration 
variable" +
+  " spark.sql.crossJoin.enabled = true.")
--- End diff --

What about using the conf val in SQLConf?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214933586
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  if (SQLConf.get.crossJoinEnabled) {
+// if condition expression is unevaluable, it will be 
removed from
+// the new join conditions, if all conditions is 
unevaluable, we should
+// change the join type to CrossJoin.
+logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+  "plan is unevaluable, it will be ignored and the join 
plan will be " +
+  s"turned to cross join. This plan shows below:\n $j")
+Cross
+  } else {
+// if the crossJoinEnabled is false, an AnalysisException 
will throw by
+// [[CheckCartesianProducts]], we throw firstly here for 
better readable
+// information.
+throw new AnalysisException("Detected the whole 
commonJoinCondition:" +
+  "$commonJoinCondition of the join plan is unevaluable, 
we need to cast the" +
--- End diff --

missing `s`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22326
  
**[Test build #95676 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95676/testReport)**
 for PR 22326 at commit 
[`a86a7d5`](https://github.com/apache/spark/commit/a86a7d5cdf8a2979a43276b7d965133263ecc809).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22326
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2840/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214932266
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...

2018-09-04 Thread xuanyuanking
Github user xuanyuanking commented on a diff in the pull request:

https://github.com/apache/spark/pull/22326#discussion_r214931484
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 ---
@@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends 
Rule[LogicalPlan] with PredicateHelper {
 reduceLeftOption(And).map(Filter(_, left)).getOrElse(left)
   val newRight = rightJoinConditions.
 reduceLeftOption(And).map(Filter(_, right)).getOrElse(right)
-  val newJoinCond = commonJoinCondition.reduceLeftOption(And)
-
-  Join(newLeft, newRight, joinType, newJoinCond)
+  val (newJoinConditions, others) =
+commonJoinCondition.partition(canEvaluateWithinJoin)
+  val newJoinCond = newJoinConditions.reduceLeftOption(And)
+  // if condition expression is unevaluable, it will be removed 
from
+  // the new join conditions, if all conditions is unevaluable, we 
should
+  // change the join type to CrossJoin.
+  val newJoinType =
+if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) {
+  logWarning(s"The whole 
commonJoinCondition:$commonJoinCondition of the join " +
+s"plan:\n $j is unevaluable, it will be ignored and the 
join plan will be " +
+s"turned to cross join.")
+  Cross
+} else joinType
--- End diff --

Thanks, done in a86a7d5.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22227: [SPARK-25202] [SQL] Implements split with limit s...

2018-09-04 Thread phegstrom
Github user phegstrom commented on a diff in the pull request:

https://github.com/apache/spark/pull/7#discussion_r214930943
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala ---
@@ -2546,15 +2546,39 @@ object functions {
   def soundex(e: Column): Column = withExpr { SoundEx(e.expr) }
 
   /**
-   * Splits str around pattern (pattern is a regular expression).
+   * Splits str around matches of the given regex.
*
-   * @note Pattern is a string representation of the regular expression.
+   * @param str a string expression to split
+   * @param regex a string representing a regular expression. The regex 
string should be
+   *  a Java regular expression.
*
* @group string_funcs
* @since 1.5.0
*/
-  def split(str: Column, pattern: String): Column = withExpr {
-StringSplit(str.expr, lit(pattern).expr)
+  def split(str: Column, regex: String): Column = withExpr {
--- End diff --

The reason I changed it is that every time we mentioned `pattern` in the 
comments/docs, we always added a phrase like "pattern, which is a regular 
expression ..."

just felt like unnecessary explanation needed if we called the variable 
`regex`. Happy to change if you think necessary though!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...

2018-09-04 Thread tgravescs
Github user tgravescs commented on the issue:

https://github.com/apache/spark/pull/22112
  
To clarify your last few comments, I think you are saying if you were to 
fail all the reduce tasks, the shuffle write data is still there and doesn't 
get removed and since first write wins on rerun it might still use the older 
already shuffled data?

So in order to fix that we would need a way to tell the executors to remove 
that older committed shuffle data


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22227: [SPARK-25202] [SQL] Implements split with limit s...

2018-09-04 Thread phegstrom
Github user phegstrom commented on a diff in the pull request:

https://github.com/apache/spark/pull/7#discussion_r214926165
  
--- Diff: 
common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java 
---
@@ -394,12 +394,14 @@ public void substringSQL() {
 
   @Test
   public void split() {
-
assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), -1),
-  new UTF8String[]{fromString("ab"), fromString("def"), 
fromString("ghi")}));
-
assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2),
-  new UTF8String[]{fromString("ab"), fromString("def,ghi")}));
-
assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2),
-  new UTF8String[]{fromString("ab"), fromString("def,ghi")}));
+UTF8String[] negativeAndZeroLimitCase =
+new UTF8String[]{fromString("ab"), fromString("def"), 
fromString("ghi"), fromString("")};
+
assertTrue(Arrays.equals(fromString("ab,def,ghi,").split(fromString(","), 0),
+negativeAndZeroLimitCase));
+
assertTrue(Arrays.equals(fromString("ab,def,ghi,").split(fromString(","), -1),
--- End diff --

@HyukjinKwon the last two were duplicates:
```
  new UTF8String[]{fromString("ab"), fromString("def,ghi")}));

assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2),
  new UTF8String[]{fromString("ab"), fromString("def,ghi")}));
```

And I also thought it better to include the case where you do get an empty 
string (adding one more instance of the regex at the end). Want me to revert? 
My view is it's more exhaustive of the expected behavior, and also easier to 
see that limit = -1 should behave exactly like limit = 0.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214918569
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+import org.apache.spark.sql.test.SharedSQLContext
+
+class FetchedPoolSuite extends SharedSQLContext {
+  type Record = ConsumerRecord[Array[Byte], Array[Byte]]
+
+  private val dummyBytes = "dummy".getBytes
+
+  test("acquire fresh one") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+dataPool.release(cacheKey, data)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("acquire fetched data from multiple keys") {
+val dataPool = FetchedDataPool.build
+
+val cacheKeys = (0 to 10).map { partId =>
+  CacheKey("testgroup", new TopicPartition("topic", partId))
+}
+
+assert(dataPool.getCache.size === 0)
+cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) }
+
+val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0)))
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(dataPool.getCache(key).head.inUse)
+}
+
+dataList.map { case (_, data) =>
+  data.withNewPoll(testRecords(0, 5).listIterator, 5)
+}
+
+dataList.foreach { case (key, data) =>
+  dataPool.release(key, data)
+}
+
+assert(dataPool.getCache.size === cacheKeys.size)
+cacheKeys.map { key =>
+  assert(dataPool.getCache(key).size === 1)
+  assert(!dataPool.getCache(key).head.inUse)
+}
+
+dataPool.shutdown()
+  }
+
+  test("continuous use of fetched data from single key") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val data = dataPool.acquire(cacheKey, 0)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+data.withNewPoll(testRecords(0, 5).listIterator, 5)
+
+(0 to 3).foreach { _ => data.next() }
+
+dataPool.release(cacheKey, data)
+
+// suppose next batch
+
+val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData)
+
+assert(data.eq(data2))
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.release(cacheKey, data2)
+
+assert(dataPool.getCache(cacheKey).size === 1)
+assert(!dataPool.getCache(cacheKey).head.inUse)
+
+dataPool.shutdown()
+  }
+
+  test("multiple tasks referring same key continuously using fetched 
data") {
+val dataPool = FetchedDataPool.build
+
+val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0))
+
+assert(dataPool.getCache.get(cacheKey).isEmpty)
+
+val 

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214913221
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 
re

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214916741
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val fetch

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214916493
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -18,222 +18,247 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
+import java.io.Closeable
 import java.util.concurrent.TimeoutException
 
 import scala.collection.JavaConverters._
 
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, 
KafkaConsumer, OffsetOutOfRangeException}
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.{SparkEnv, SparkException, TaskContext}
+import org.apache.spark.TaskContext
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange
+import 
org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, 
CacheKey, UNKNOWN_OFFSET}
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
-import org.apache.spark.util.UninterruptibleThread
+import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread}
+
+/**
+ * This class simplifies the usages of Kafka consumer in Spark SQL Kafka 
connector.
+ *
+ * NOTE: Like KafkaConsumer, this class is not thread-safe.
+ * NOTE for contributors: It is possible for the instance to be used from 
multiple callers,
+ * so all the methods should not rely on current cursor and use seek 
manually.
+ */
+private[kafka010] class InternalKafkaConsumer(
+val topicPartition: TopicPartition,
+val kafkaParams: ju.Map[String, Object]) extends Closeable with 
Logging {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  private val consumer = createConsumer
 
-private[kafka010] sealed trait KafkaDataConsumer {
   /**
-   * Get the record for the given offset if available.
-   *
-   * If the record is invisible (either a
-   * transaction message, or an aborted message when the consumer's 
`isolation.level` is
-   * `read_committed`), it will be skipped and this method will try to 
fetch next available record
-   * within [offset, untilOffset).
-   *
-   * This method also will try its best to detect data loss. If 
`failOnDataLoss` is `true`, it will
-   * throw an exception when we detect an unavailable offset. If 
`failOnDataLoss` is `false`, this
-   * method will try to fetch next available record within [offset, 
untilOffset).
-   *
-   * When this method tries to skip offsets due to either invisible 
messages or data loss and
-   * reaches `untilOffset`, it will return `null`.
+   * Poll messages from Kafka starting from `offset` and returns a pair of 
"list of consumer record"
+   * and "offset after poll". The list of consumer record may be empty if 
the Kafka consumer fetches
+   * some messages but all of them are not visible messages (either 
transaction messages,
+   * or aborted messages when `isolation.level` is `read_committed`).
*
-   * @param offset the offset to fetch.
-   * @param untilOffsetthe max offset to fetch. Exclusive.
-   * @param pollTimeoutMs  timeout in milliseconds to poll data from Kafka.
-   * @param failOnDataLoss When `failOnDataLoss` is `true`, this method 
will either return record at
-   *   offset if available, or throw exception.when 
`failOnDataLoss` is `false`,
-   *   this method will either return record at offset 
if available, or return
-   *   the next earliest available record less than 
untilOffset, or null. It
-   *   will not throw any exception.
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
*/
-  def get(
-  offset: Long,
-  untilOffset: Long,
-  pollTimeoutMs: Long,
-  failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = 
{
-internalConsumer.get(offset, untilOffset, pollTimeoutMs, 
failOnDataLoss)
+  def fetch(offset: Long, pollTimeoutMs: Long)
+  : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = {
+// Seek to the offset because we may call seekToBeginning or seekToEnd 
before this.
+seek(offset)
+val p = consumer.poll(pollTimeoutMs)
+val r = p.records(topicPartition)
+logDebug(s"Polled $groupId ${p.partitions()}  ${r.size}")
+val offsetAfterPoll = consumer.position(topicPartition)
+logDebug(s"Offset changed from $offset to $offsetAfterPoll after 
polling")
+val fetch

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917536
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala
 ---
@@ -0,0 +1,299 @@
+/*
--- End diff --

Nice catch! Will rename.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214911381
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
--- End diff --

This is to make sure `cache` itself is not accessible from outside, and 
when callers access `cache` via `getCache`, they will be noted it should not be 
used other than testing from scaladoc.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910337
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
--- End diff --

Nice catch! Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917284
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
--- End diff --

This is defined as `var` so just to avoid additional wrapping here. Same 
here as above: if we prefer Option I'm happy to change but not sure about it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910482
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214910433
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
--- End diff --

Will address.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214907878
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 
re

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214908731
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala
 ---
@@ -0,0 +1,241 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.ConcurrentHashMap
+
+import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, 
PooledObject, SwallowedExceptionListener}
+import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, 
DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig}
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey
+
+/**
+ * Provides object pool for [[InternalKafkaConsumer]] which is grouped by 
[[CacheKey]].
+ *
+ * This class leverages [[GenericKeyedObjectPool]] internally, hence 
providing methods based on
+ * the class, and same contract applies: after using the borrowed object, 
you must either call
+ * returnObject() if the object is healthy to return to pool, or 
invalidateObject() if the object
+ * should be destroyed.
+ *
+ * The soft capacity of pool is determined by 
"spark.sql.kafkaConsumerCache.capacity" config value,
+ * and the pool will have reasonable default value if the value is not 
provided.
+ * (The instance will do its best effort to respect soft capacity but it 
can exceed when there's
+ * a borrowing request and there's neither free space nor idle object to 
clear.)
+ *
+ * This class guarantees that no caller will get pooled object once the 
object is borrowed and
+ * not yet returned, hence provide thread-safety usage of non-thread-safe 
[[InternalKafkaConsumer]]
+ * unless caller shares the object to multiple threads.
+ */
+private[kafka010] class InternalKafkaConsumerPool(
+objectFactory: ObjectFactory,
+poolConfig: PoolConfig) {
+
+  // the class is intended to have only soft capacity
+  assert(poolConfig.getMaxTotal < 0)
+
+  private lazy val pool = {
+val internalPool = new GenericKeyedObjectPool[CacheKey, 
InternalKafkaConsumer](
+  objectFactory, poolConfig)
+
internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener)
+internalPool
+  }
+
+  /**
+   * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no 
idle object for the key,
+   * the pool will create the [[InternalKafkaConsumer]] object.
+   *
+   * If the pool doesn't have idle object for the key and also exceeds the 
soft capacity,
+   * pool will try to clear some of idle objects.
+   *
+   * Borrowed object must be returned by either calling returnObject or 
invalidateObject, otherwise
+   * the object will be kept in pool as active object.
+   */
+  def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): 
InternalKafkaConsumer = {
+updateKafkaParamForKey(key, kafkaParams)
+
+if (getTotal == poolConfig.getSoftMaxTotal()) {
+  pool.clearOldest()
+}
+
+pool.borrowObject(key)
+  }
+
+  /** Returns borrowed object to the pool. */
+  def returnObject(consumer: InternalKafkaConsumer): Unit = {
+pool.returnObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates (destroy) borrowed object to the pool. */
+  def invalidateObject(consumer: InternalKafkaConsumer): Unit = {
+pool.invalidateObject(extractCacheKey(consumer), consumer)
+  }
+
+  /** Invalidates all idle consumers for the key */
+  def invalidateKey(key: CacheKey): Unit = {
+pool.clear(key)
+  }
+
+  /**
+   * Closes the keyed object pool. Once the pool is closed,
+   * borrowObject will fail with [[IllegalStateException]], but 
re

[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214917336
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala
 ---
@@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer(
 }
   }
 
-  /** Create a new consumer and reset cached states */
-  private def resetConsumer(): Unit = {
-consumer.close()
-consumer = createConsumer
-fetchedData.reset()
+  /**
+   * Poll messages from Kafka starting from `offset` and update 
`fetchedData`. `fetchedData` may be
+   * empty if the Kafka consumer fetches some messages but all of them are 
not visible messages
+   * (either transaction messages, or aborted messages when 
`isolation.level` is `read_committed`).
+   *
+   * @throws OffsetOutOfRangeException if `offset` is out of range.
+   * @throws TimeoutException if the consumer position is not changed 
after polling. It means the
+   *  consumer polls nothing before timeout.
+   */
+  private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = {
+val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs)
+fetchedData.withNewPoll(records.listIterator, offsetAfterPoll)
+  }
+
+  private def ensureConsumerAvailable(): Unit = {
+if (consumer == null) {
+  consumer = consumerPool.borrowObject(cacheKey, kafkaParams)
+}
+  }
+
+  private def ensureFetchedDataAvailable(offset: Long): Unit = {
+if (fetchedData == null) {
--- End diff --

Same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...

2018-09-04 Thread HeartSaVioR
Github user HeartSaVioR commented on a diff in the pull request:

https://github.com/apache/spark/pull/22138#discussion_r214909826
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala
 ---
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeUnit
+
+import scala.collection.mutable
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, 
UNKNOWN_OFFSET}
+import org.apache.spark.util.ThreadUtils
+
+/**
+ * Provides object pool for [[FetchedData]] which is grouped by 
[[CacheKey]].
+ *
+ * Along with CacheKey, it receives desired start offset to find cached 
FetchedData which
+ * may be stored from previous batch. If it can't find one to match, it 
will create
+ * a new FetchedData.
+ */
+private[kafka010] class FetchedDataPool {
+  import FetchedDataPool._
+
+  private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) 
{
+var lastReleasedTimestamp: Long = Long.MaxValue
+var lastAcquiredTimestamp: Long = Long.MinValue
+var inUse: Boolean = false
+
+def getObject: FetchedData = fetchedData
+  }
+
+  private object CachedFetchedData {
+def empty(): CachedFetchedData = {
+  val emptyData = FetchedData(
+ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], 
Array[Byte]]],
+UNKNOWN_OFFSET,
+UNKNOWN_OFFSET)
+
+  CachedFetchedData(emptyData)
+}
+  }
+
+  private type CachedFetchedDataList = 
mutable.ListBuffer[CachedFetchedData]
+
+  private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = 
mutable.HashMap.empty
+
+  /** Retrieve internal cache. This method is only for testing. */
+  private[kafka010] def getCache: mutable.Map[CacheKey, 
CachedFetchedDataList] = cache
+
+  private val (minEvictableIdleTimeMillis, 
evictorThreadRunIntervalMillis): (Long, Long) = {
+val conf = SparkEnv.get.conf
+
+val minEvictIdleTime = 
conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS,
+  DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS)
+
+val evictorThreadInterval = conf.getLong(
+  CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS,
+  DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS)
+
+(minEvictIdleTime, evictorThreadInterval)
+  }
+
+  private val executorService = 
ThreadUtils.newDaemonSingleThreadScheduledExecutor(
+"kafka-fetched-data--cache-evictor")
+
+  private def startEvictorThread(): Unit = {
+executorService.scheduleAtFixedRate(new Runnable() {
+  override def run(): Unit = {
+removeIdleFetchedData()
+  }
+}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS)
+  }
+
+  startEvictorThread()
+
+  def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = 
synchronized {
+val fetchedDataList = cache.getOrElseUpdate(key, new 
CachedFetchedDataList())
+
+val cachedFetchedDataOption = fetchedDataList.find { p =>
+  !p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset
+}
+
+var cachedFetchedData: CachedFetchedData = null
+if (cachedFetchedDataOption.isDefined) {
+  cachedFetchedData = cachedFetchedDataOption.get
+} else {
+  cachedFetchedData = CachedFetchedData.empty()
+  fetchedDataList += cachedFetchedData
+}
+
+cachedFetchedData.lastAcquiredTimestamp = System.currentTimeMillis()
+cachedFetchedData.inUse = true
+
+cachedFetchedData.getObject
+  }
+
+  def invalidate(key: CacheKey): Unit = synchronized {
+cache.remove(key)
+  }
+
+  def release(key: CacheKey, fetchedData: FetchedData): Unit = 
s

[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22328
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95668/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias

2018-09-04 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22277
  
Thank you for interest in this issue, however, I don't think the changes 
proposed in this PR is valid, consider you have another predicate like `a > z`, 
it is surely desired to infer a new constraint `z > z`. Please correct me if 
I'm wrong about this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22328
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22328
  
**[Test build #95668 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95668/testReport)**
 for PR 22328 at commit 
[`5164d19`](https://github.com/apache/spark/commit/5164d19deb447e49e87d20f5a1efbb0c2ee177ee).
 * This patch **fails PySpark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22171
  
**[Test build #95675 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95675/testReport)**
 for PR 22171 at commit 
[`6b4c2f2`](https://github.com/apache/spark/commit/6b4c2f24c500bb972b5ffd14897bfb5fb2184ffc).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22171
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22171
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2839/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...

2018-09-04 Thread vinodkc
Github user vinodkc commented on the issue:

https://github.com/apache/spark/pull/22171
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...

2018-09-04 Thread Dooyoung-Hwang
Github user Dooyoung-Hwang commented on a diff in the pull request:

https://github.com/apache/spark/pull/22219#discussion_r214908763
  
--- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala ---
@@ -3237,6 +3238,28 @@ class Dataset[T] private[sql](
 files.toSet.toArray
   }
 
+  /**
+   * Returns the tuple of the row count and an SeqView that contains all 
rows in this Dataset.
+   *
+   * The SeqView will consume as much memory as the total size of 
serialized results which can be
+   * limited with the config 'spark.driver.maxResultSize'. Rows are 
deserialized when iterating rows
+   * with iterator of returned SeqView. Whether to collect all 
deserialized rows or to iterate them
+   * incrementally can be decided with considering total rows count and 
driver memory.
+   */
+  private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) =
+withAction("collectCountAndSeqView", queryExecution) { plan =>
+  // This projection writes output to a `InternalRow`, which means 
applying this projection is
+  // not thread-safe. Here we create the projection inside this method 
to make `Dataset`
+  // thread-safe.
+  val objProj = GenerateSafeProjection.generate(deserializer :: Nil)
+  val (totalRowCount, internalRowsView) = plan.executeCollectSeqView()
+  (totalRowCount, internalRowsView.map { row =>
+// The row returned by SafeProjection is `SpecificInternalRow`, 
which ignore the data type
+// parameter of its `get` method, so it's safe to use null here.
+objProj(row).get(0, null).asInstanceOf[T]
+  }.asInstanceOf[SeqView[T, Array[T]]])
+}
--- End diff --

Yes, that's what I mean. I thought that 'deserializer' is declared with 
private, so there is now way to get 'deserializer' out of Dataset.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22327
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22327
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95664/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22327
  
**[Test build #95664 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95664/testReport)**
 for PR 22327 at commit 
[`f89448b`](https://github.com/apache/spark/commit/f89448b7b0598a59f750a324e869e7768cfedbc1).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...

2018-09-04 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/22330
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...

2018-09-04 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/22330#discussion_r214903740
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala
 ---
@@ -45,6 +45,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest 
with SharedSQLContext {
 
   protected override def afterAll() = {
 SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, 
originalLimitFlatGlobalLimit)
+super.afterAll()
--- End diff --

Good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22313
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22313
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95669/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22313
  
**[Test build #95669 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95669/testReport)**
 for PR 22313 at commit 
[`3cd4443`](https://github.com/apache/spark/commit/3cd444306c3b8b6c42a74b7cfb0755b8ce209c84).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22141: [SPARK-25154][SQL] Support NOT IN sub-queries inside nes...

2018-09-04 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/22141
  
@dilipbiswal Any update?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...

2018-09-04 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19691
  
@DazhuangSu still busy?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/22240
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22048: [SPARK-25108][SQL] Fix the show method to display the wi...

2018-09-04 Thread xuejianbest
Github user xuejianbest commented on the issue:

https://github.com/apache/spark/pull/22048
  
I see, Thinks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22316: [SPARK-25048][SQL] Pivoting by multiple columns i...

2018-09-04 Thread maropu
Github user maropu commented on a diff in the pull request:

https://github.com/apache/spark/pull/22316#discussion_r214894498
  
--- Diff: 
sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala ---
@@ -308,4 +308,27 @@ class DataFramePivotSuite extends QueryTest with 
SharedSQLContext {
 
 assert(exception.getMessage.contains("aggregate functions are not 
allowed"))
   }
+
+  test("pivoting column list with values") {
+val expected = Row(2012, 1.0, null) :: Row(2013, 48000.0, 3.0) 
:: Nil
+val df = trainingSales
+  .groupBy($"sales.year")
+  .pivot(struct(lower($"sales.course"), $"training"), Seq(
+struct(lit("dotnet"), lit("Experts")),
+struct(lit("java"), lit("Dummies")))
+  ).agg(sum($"sales.earnings"))
+
+checkAnswer(df, expected)
+  }
+
+  test("pivoting column list") {
+val exception = intercept[RuntimeException] {
+  trainingSales
+.groupBy($"sales.year")
+.pivot(struct(lower($"sales.course"), $"training"))
+.agg(sum($"sales.earnings"))
+.collect()
--- End diff --

I think invalid queries basically throw `AnalysisException. But, yea, 
indeed, we'd better to keep the current behaivour. Thanks!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22330
  
**[Test build #95674 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95674/testReport)**
 for PR 22330 at commit 
[`0b01066`](https://github.com/apache/spark/commit/0b010669b15781a648f7c7bde13556ddb7c003c3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22330
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2838/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22330
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...

2018-09-04 Thread jiangxb1987
GitHub user jiangxb1987 opened a pull request:

https://github.com/apache/spark/pull/22330

[SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle SparkSession on 
TakeOrderedAndProjectSuite finishes

## What changes were proposed in this pull request?

Previously in `TakeOrderedAndProjectSuite` the SparkSession will not get 
recycled when the test suite finishes.

## How was this patch tested?

N/A

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jiangxb1987/spark SPARK-19355

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/22330.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #22330


commit 0b010669b15781a648f7c7bde13556ddb7c003c3
Author: Xingbo Jiang 
Date:   2018-09-04T12:23:30Z

properly recycle SparkSession on TakeOrderedAndProjectSuite finishes.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22320
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22320
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95663/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22320
  
**[Test build #95663 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95663/testReport)**
 for PR 22320 at commit 
[`3ca072d`](https://github.com/apache/spark/commit/3ca072d18474d1536c3ac729fe1e0b79cd855cca).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22240
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22240
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95661/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22240
  
**[Test build #95661 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95661/testReport)**
 for PR 22240 at commit 
[`9b6a47b`](https://github.com/apache/spark/commit/9b6a47bf718309eb0b5a22a0282a5a7c4226e991).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22179
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22179
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2837/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22179
  
**[Test build #95673 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95673/testReport)**
 for PR 22179 at commit 
[`d1dac1e`](https://github.com/apache/spark/commit/d1dac1e75d450ca26a460cb36f8ac0684a251c64).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2

2018-09-04 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/22179
  
Thanks, @dongjoon-hyun 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22321: [DOC] Update some outdated links

2018-09-04 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/22321


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22321: [DOC] Update some outdated links

2018-09-04 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/22321
  
Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22326
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95662/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22282
  
**[Test build #95672 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)**
 for PR 22282 at commit 
[`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22326
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22326
  
**[Test build #95662 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95662/testReport)**
 for PR 22326 at commit 
[`d58f3a5`](https://github.com/apache/spark/commit/d58f3a58f5be3efaab7e56b353ce0dfeb7610f7d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22321: [DOC] Update some outdated links

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22321
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95656/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22321: [DOC] Update some outdated links

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22321
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22321: [DOC] Update some outdated links

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22321
  
**[Test build #95656 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95656/testReport)**
 for PR 22321 at commit 
[`d9bbf3c`](https://github.com/apache/spark/commit/d9bbf3c4a7be82d66eb643a42c2724cd30ea1ad5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/6
  
**[Test build #95671 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95671/testReport)**
 for PR 6 at commit 
[`90c9687`](https://github.com/apache/spark/commit/90c968772d74c8aadc7a4d0e74e226554b921486).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22282
  
**[Test build #95670 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)**
 for PR 22282 at commit 
[`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #22226: [SPARK-25252][SQL] Support arrays of any types by...

2018-09-04 Thread MaxGekk
Github user MaxGekk commented on a diff in the pull request:

https://github.com/apache/spark/pull/6#discussion_r214878373
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala
 ---
@@ -663,12 +662,7 @@ case class StructsToJson(
 rowSchema, writer, new JSONOptions(options, timeZoneId.get))
 
   @transient
-  lazy val rowSchema = child.dataType match {
-case st: StructType => st
-case ArrayType(st: StructType, _) => st
-case mt: MapType => mt
-case ArrayType(mt: MapType, _) => mt
-  }
+  lazy val rowSchema = child.dataType
--- End diff --

I tried to remove `lazy` and got many errors on tests like:
```
Invalid call to dataType on unresolved object, tree: 'a
org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to 
dataType on unresolved object, tree: 'a
at 
org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105)
at 
org.apache.spark.sql.catalyst.expressions.StructsToJson.(jsonExpressions.scala:665)
```
If you don't mind, I will keep it `lazy`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3

2018-09-04 Thread wangyum
Github user wangyum commented on the issue:

https://github.com/apache/spark/pull/22327
  
Yes.  This is  a Hadoop thing. I try to build Hadoop 2.7.7 with  
[`Configuration.getRestrictParserDefault(Object 
resource)`](https://github.com/apache/hadoop/blob/release-2.7.7-RC0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java#L236)
 = true and false.
It succeeded when `Configuration.getRestrictParserDefault(Object 
resource)=false`, but failed when 
`Configuration.getRestrictParserDefault(Object resource)=true`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22321: [DOC] Update some outdated links

2018-09-04 Thread dongjoon-hyun
Github user dongjoon-hyun commented on the issue:

https://github.com/apache/spark/pull/22321
  
I see. I'll try after Jenkins passes. Thank you, @HyukjinKwon .


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/22313
  
**[Test build #95669 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95669/testReport)**
 for PR 22313 at commit 
[`3cd4443`](https://github.com/apache/spark/commit/3cd444306c3b8b6c42a74b7cfb0755b8ce209c84).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22313
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2836/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...

2018-09-04 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/22313
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   >