[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/6 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/6 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95671/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/6 **[Test build #95671 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95671/testReport)** for PR 6 at commit [`90c9687`](https://github.com/apache/spark/commit/90c968772d74c8aadc7a4d0e74e226554b921486). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user dilipbiswal commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214962083 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + --- End diff -- @mgaido91 > This is, indeed, arguable. I think that letting the user choose is a good idea. If the users runs the query and gets an AnalysisException because he/she is trying to perform a cartesian product, he/she can decide: ok, I am doing a wrong thing, let's change it; or he/she can say, well, one of my 2 tables involved contains 10 rows, not a big deal, I want to perform it nonetheless, let's set spark.sql.crossJoin.enabled=true and run it. Sounds reasonable .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22282 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22282 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95672/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22282 **[Test build #95672 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)** for PR 22282 at commit [`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22282 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95670/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22282 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22282 **[Test build #95670 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)** for PR 22282 at commit [`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22309: [SPARK-20384][CORE] Support value class in schema of Dat...
Github user mt40 commented on the issue: https://github.com/apache/spark/pull/22309 @cloud-fan @liancheng @marmbrus could you please take a look at this and start the tests? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18906 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95677/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/18906 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18906 **[Test build #95677 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95677/testReport)** for PR 18906 at commit [`dcf3f07`](https://github.com/apache/spark/commit/dcf3f0767df1801a3bb6f679226fda18a69c72aa). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22228: [SPARK-25124][ML]VectorSizeHint setSize and getSize don'...
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/8 This is already merged, @huaxingao Could you please close this PR? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22221: [SPARK-25231] : Fix synchronization of executor heartbea...
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/1 +1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22329: [SPARK-25328][PYTHON] Add an example for having t...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/22329#discussion_r214940744 --- Diff: python/pyspark/sql/functions.py --- @@ -2804,6 +2804,20 @@ def pandas_udf(f=None, returnType=None, functionType=None): | 1|1.5| | 2|6.0| +---+---+ + >>> @pandas_udf("id long, v1 double, v2 double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP --- End diff -- It took me a while to realize `v1` is a grouping key. It also a bit uncommon to use double value as a grouping key . How about we do sth like? `id long, additional_key long, v double` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22221: [SPARK-25231] : Fix synchronization of executor h...
Github user tgravescs commented on a diff in the pull request: https://github.com/apache/spark/pull/1#discussion_r214937032 --- Diff: core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala --- @@ -517,10 +517,10 @@ private[spark] class TaskSchedulerImpl( accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])], blockManagerId: BlockManagerId): Boolean = { // (taskId, stageId, stageAttemptId, accumUpdates) -val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = synchronized { +val accumUpdatesWithTaskIds: Array[(Long, Int, Int, Seq[AccumulableInfo])] = { accumUpdates.flatMap { case (id, updates) => val accInfos = updates.map(acc => acc.toInfo(Some(acc.value), None)) -taskIdToTaskSetManager.get(id).map { taskSetMgr => +Option(taskIdToTaskSetManager.get(id)).map { taskSetMgr => --- End diff -- I agree this could happen, but it shouldn't cause issues because before this change the executor could have been removed right before this function was called (its all timing dependent), so that does not change this functionality. This is only to update accumulators for running tasks. If the tasks had finished then the accumulator updates would have been processed via the task end events. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22331 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22331 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22331: Tests for idempotency of FileStreamSink - Work in Progre...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22331 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22331: Tests for idempotency of FileStreamSink - Work in...
GitHub user misutoth opened a pull request: https://github.com/apache/spark/pull/22331 Tests for idempotency of FileStreamSink - Work in Progress ## What changes were proposed in this pull request? Reproduce File Sink duplication in driver failure scenario to help understanding the situation. ## How was this patch tested? This is a test addition only that was run and the last 2 tests failed showing there is a problem. You can merge this pull request into a Git repository by running: $ git pull https://github.com/misutoth/spark file-sink-dupe Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22331.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22331 commit 0a5c6c45a4b90fc2ea8bd2647b6d3d3dfd8bd1a4 Author: Mihaly Toth Date: 2018-09-03T11:47:52Z Tests for idempotency of FileStreamSink --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #18906: [SPARK-21692][PYSPARK][SQL] Add nullability support to P...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/18906 **[Test build #95677 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95677/testReport)** for PR 18906 at commit [`dcf3f07`](https://github.com/apache/spark/commit/dcf3f0767df1801a3bb6f679226fda18a69c72aa). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933474 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") + } +} else { + joinType +} + + val join = Join(newLeft, newRight, newJoinType, newJoinCond) + if (others.nonEmpty) { +Filter(others.reduceLeft(And), join) --- End diff -- as pointed out by @dilipbiswal, this is correct only in the case of InnerJoin --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933787 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + + " join to cross join by setting the configuration variable" + + " spark.sql.crossJoin.enabled = true.") --- End diff -- What about using the conf val in SQLConf? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214933586 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,38 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + if (SQLConf.get.crossJoinEnabled) { +// if condition expression is unevaluable, it will be removed from +// the new join conditions, if all conditions is unevaluable, we should +// change the join type to CrossJoin. +logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + + "plan is unevaluable, it will be ignored and the join plan will be " + + s"turned to cross join. This plan shows below:\n $j") +Cross + } else { +// if the crossJoinEnabled is false, an AnalysisException will throw by +// [[CheckCartesianProducts]], we throw firstly here for better readable +// information. +throw new AnalysisException("Detected the whole commonJoinCondition:" + + "$commonJoinCondition of the join plan is unevaluable, we need to cast the" + --- End diff -- missing `s` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22326 **[Test build #95676 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95676/testReport)** for PR 22326 at commit [`a86a7d5`](https://github.com/apache/spark/commit/a86a7d5cdf8a2979a43276b7d965133263ecc809). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22326 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2840/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214932266 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22326: [SPARK-25314][SQL] Fix Python UDF accessing attib...
Github user xuanyuanking commented on a diff in the pull request: https://github.com/apache/spark/pull/22326#discussion_r214931484 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala --- @@ -1208,9 +1208,26 @@ object PushPredicateThroughJoin extends Rule[LogicalPlan] with PredicateHelper { reduceLeftOption(And).map(Filter(_, left)).getOrElse(left) val newRight = rightJoinConditions. reduceLeftOption(And).map(Filter(_, right)).getOrElse(right) - val newJoinCond = commonJoinCondition.reduceLeftOption(And) - - Join(newLeft, newRight, joinType, newJoinCond) + val (newJoinConditions, others) = +commonJoinCondition.partition(canEvaluateWithinJoin) + val newJoinCond = newJoinConditions.reduceLeftOption(And) + // if condition expression is unevaluable, it will be removed from + // the new join conditions, if all conditions is unevaluable, we should + // change the join type to CrossJoin. + val newJoinType = +if (commonJoinCondition.nonEmpty && newJoinCond.isEmpty) { + logWarning(s"The whole commonJoinCondition:$commonJoinCondition of the join " + +s"plan:\n $j is unevaluable, it will be ignored and the join plan will be " + +s"turned to cross join.") + Cross +} else joinType --- End diff -- Thanks, done in a86a7d5. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [SQL] Implements split with limit s...
Github user phegstrom commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r214930943 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/functions.scala --- @@ -2546,15 +2546,39 @@ object functions { def soundex(e: Column): Column = withExpr { SoundEx(e.expr) } /** - * Splits str around pattern (pattern is a regular expression). + * Splits str around matches of the given regex. * - * @note Pattern is a string representation of the regular expression. + * @param str a string expression to split + * @param regex a string representing a regular expression. The regex string should be + * a Java regular expression. * * @group string_funcs * @since 1.5.0 */ - def split(str: Column, pattern: String): Column = withExpr { -StringSplit(str.expr, lit(pattern).expr) + def split(str: Column, regex: String): Column = withExpr { --- End diff -- The reason I changed it is that every time we mentioned `pattern` in the comments/docs, we always added a phrase like "pattern, which is a regular expression ..." just felt like unnecessary explanation needed if we called the variable `regex`. Happy to change if you think necessary though! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22112: [SPARK-23243][Core] Fix RDD.repartition() data correctne...
Github user tgravescs commented on the issue: https://github.com/apache/spark/pull/22112 To clarify your last few comments, I think you are saying if you were to fail all the reduce tasks, the shuffle write data is still there and doesn't get removed and since first write wins on rerun it might still use the older already shuffled data? So in order to fix that we would need a way to tell the executors to remove that older committed shuffle data --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22227: [SPARK-25202] [SQL] Implements split with limit s...
Github user phegstrom commented on a diff in the pull request: https://github.com/apache/spark/pull/7#discussion_r214926165 --- Diff: common/unsafe/src/test/java/org/apache/spark/unsafe/types/UTF8StringSuite.java --- @@ -394,12 +394,14 @@ public void substringSQL() { @Test public void split() { - assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), -1), - new UTF8String[]{fromString("ab"), fromString("def"), fromString("ghi")})); - assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2), - new UTF8String[]{fromString("ab"), fromString("def,ghi")})); - assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2), - new UTF8String[]{fromString("ab"), fromString("def,ghi")})); +UTF8String[] negativeAndZeroLimitCase = +new UTF8String[]{fromString("ab"), fromString("def"), fromString("ghi"), fromString("")}; + assertTrue(Arrays.equals(fromString("ab,def,ghi,").split(fromString(","), 0), +negativeAndZeroLimitCase)); + assertTrue(Arrays.equals(fromString("ab,def,ghi,").split(fromString(","), -1), --- End diff -- @HyukjinKwon the last two were duplicates: ``` new UTF8String[]{fromString("ab"), fromString("def,ghi")})); assertTrue(Arrays.equals(fromString("ab,def,ghi").split(fromString(","), 2), new UTF8String[]{fromString("ab"), fromString("def,ghi")})); ``` And I also thought it better to include the case where you do get an empty string (adding one more instance of the regex at the end). Want me to revert? My view is it's more exhaustive of the expected behavior, and also easier to see that limit = -1 should behave exactly like limit = 0. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214918569 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala --- @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} + +import scala.collection.JavaConverters._ + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey +import org.apache.spark.sql.test.SharedSQLContext + +class FetchedPoolSuite extends SharedSQLContext { + type Record = ConsumerRecord[Array[Byte], Array[Byte]] + + private val dummyBytes = "dummy".getBytes + + test("acquire fresh one") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +dataPool.release(cacheKey, data) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("acquire fetched data from multiple keys") { +val dataPool = FetchedDataPool.build + +val cacheKeys = (0 to 10).map { partId => + CacheKey("testgroup", new TopicPartition("topic", partId)) +} + +assert(dataPool.getCache.size === 0) +cacheKeys.foreach { key => assert(dataPool.getCache.get(key).isEmpty) } + +val dataList = cacheKeys.map(key => (key, dataPool.acquire(key, 0))) + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(dataPool.getCache(key).head.inUse) +} + +dataList.map { case (_, data) => + data.withNewPoll(testRecords(0, 5).listIterator, 5) +} + +dataList.foreach { case (key, data) => + dataPool.release(key, data) +} + +assert(dataPool.getCache.size === cacheKeys.size) +cacheKeys.map { key => + assert(dataPool.getCache(key).size === 1) + assert(!dataPool.getCache(key).head.inUse) +} + +dataPool.shutdown() + } + + test("continuous use of fetched data from single key") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val data = dataPool.acquire(cacheKey, 0) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +data.withNewPoll(testRecords(0, 5).listIterator, 5) + +(0 to 3).foreach { _ => data.next() } + +dataPool.release(cacheKey, data) + +// suppose next batch + +val data2 = dataPool.acquire(cacheKey, data.nextOffsetInFetchedData) + +assert(data.eq(data2)) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(dataPool.getCache(cacheKey).head.inUse) + +dataPool.release(cacheKey, data2) + +assert(dataPool.getCache(cacheKey).size === 1) +assert(!dataPool.getCache(cacheKey).head.inUse) + +dataPool.shutdown() + } + + test("multiple tasks referring same key continuously using fetched data") { +val dataPool = FetchedDataPool.build + +val cacheKey = CacheKey("testgroup", new TopicPartition("topic", 0)) + +assert(dataPool.getCache.get(cacheKey).isEmpty) + +val
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214913221 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * borrowObject will fail with [[IllegalStateException]], but re
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214916741 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffsetthe max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { +// Seek to the offset because we may call seekToBeginning or seekToEnd before this. +seek(offset) +val p = consumer.poll(pollTimeoutMs) +val r = p.records(topicPartition) +logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") +val offsetAfterPoll = consumer.position(topicPartition) +logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") +val fetch
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214916493 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -18,222 +18,247 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} +import java.io.Closeable import java.util.concurrent.TimeoutException import scala.collection.JavaConverters._ import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord, KafkaConsumer, OffsetOutOfRangeException} import org.apache.kafka.common.TopicPartition -import org.apache.spark.{SparkEnv, SparkException, TaskContext} +import org.apache.spark.TaskContext import org.apache.spark.internal.Logging -import org.apache.spark.sql.kafka010.KafkaDataConsumer.AvailableOffsetRange +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{AvailableOffsetRange, CacheKey, UNKNOWN_OFFSET} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.util.UninterruptibleThread +import org.apache.spark.util.{ShutdownHookManager, UninterruptibleThread} + +/** + * This class simplifies the usages of Kafka consumer in Spark SQL Kafka connector. + * + * NOTE: Like KafkaConsumer, this class is not thread-safe. + * NOTE for contributors: It is possible for the instance to be used from multiple callers, + * so all the methods should not rely on current cursor and use seek manually. + */ +private[kafka010] class InternalKafkaConsumer( +val topicPartition: TopicPartition, +val kafkaParams: ju.Map[String, Object]) extends Closeable with Logging { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + private val consumer = createConsumer -private[kafka010] sealed trait KafkaDataConsumer { /** - * Get the record for the given offset if available. - * - * If the record is invisible (either a - * transaction message, or an aborted message when the consumer's `isolation.level` is - * `read_committed`), it will be skipped and this method will try to fetch next available record - * within [offset, untilOffset). - * - * This method also will try its best to detect data loss. If `failOnDataLoss` is `true`, it will - * throw an exception when we detect an unavailable offset. If `failOnDataLoss` is `false`, this - * method will try to fetch next available record within [offset, untilOffset). - * - * When this method tries to skip offsets due to either invisible messages or data loss and - * reaches `untilOffset`, it will return `null`. + * Poll messages from Kafka starting from `offset` and returns a pair of "list of consumer record" + * and "offset after poll". The list of consumer record may be empty if the Kafka consumer fetches + * some messages but all of them are not visible messages (either transaction messages, + * or aborted messages when `isolation.level` is `read_committed`). * - * @param offset the offset to fetch. - * @param untilOffsetthe max offset to fetch. Exclusive. - * @param pollTimeoutMs timeout in milliseconds to poll data from Kafka. - * @param failOnDataLoss When `failOnDataLoss` is `true`, this method will either return record at - * offset if available, or throw exception.when `failOnDataLoss` is `false`, - * this method will either return record at offset if available, or return - * the next earliest available record less than untilOffset, or null. It - * will not throw any exception. + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. */ - def get( - offset: Long, - untilOffset: Long, - pollTimeoutMs: Long, - failOnDataLoss: Boolean): ConsumerRecord[Array[Byte], Array[Byte]] = { -internalConsumer.get(offset, untilOffset, pollTimeoutMs, failOnDataLoss) + def fetch(offset: Long, pollTimeoutMs: Long) + : (ju.List[ConsumerRecord[Array[Byte], Array[Byte]]], Long) = { +// Seek to the offset because we may call seekToBeginning or seekToEnd before this. +seek(offset) +val p = consumer.poll(pollTimeoutMs) +val r = p.records(topicPartition) +logDebug(s"Polled $groupId ${p.partitions()} ${r.size}") +val offsetAfterPoll = consumer.position(topicPartition) +logDebug(s"Offset changed from $offset to $offsetAfterPoll after polling") +val fetch
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214917536 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/FetchedPoolSuite.scala --- @@ -0,0 +1,299 @@ +/* --- End diff -- Nice catch! Will rename. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214911381 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.ConsumerRecord + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.util.ThreadUtils + +/** + * Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]]. + * + * Along with CacheKey, it receives desired start offset to find cached FetchedData which + * may be stored from previous batch. If it can't find one to match, it will create + * a new FetchedData. + */ +private[kafka010] class FetchedDataPool { + import FetchedDataPool._ + + private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +var lastReleasedTimestamp: Long = Long.MaxValue +var lastAcquiredTimestamp: Long = Long.MinValue +var inUse: Boolean = false + +def getObject: FetchedData = fetchedData + } + + private object CachedFetchedData { +def empty(): CachedFetchedData = { + val emptyData = FetchedData( +ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], +UNKNOWN_OFFSET, +UNKNOWN_OFFSET) + + CachedFetchedData(emptyData) +} + } + + private type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + + private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty + + /** Retrieve internal cache. This method is only for testing. */ + private[kafka010] def getCache: mutable.Map[CacheKey, CachedFetchedDataList] = cache --- End diff -- This is to make sure `cache` itself is not accessible from outside, and when callers access `cache` via `getCache`, they will be noted it should not be used other than testing from scaladoc. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214910337 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.ConsumerRecord + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.util.ThreadUtils + +/** + * Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]]. + * + * Along with CacheKey, it receives desired start offset to find cached FetchedData which + * may be stored from previous batch. If it can't find one to match, it will create + * a new FetchedData. + */ +private[kafka010] class FetchedDataPool { + import FetchedDataPool._ + + private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +var lastReleasedTimestamp: Long = Long.MaxValue +var lastAcquiredTimestamp: Long = Long.MinValue +var inUse: Boolean = false + +def getObject: FetchedData = fetchedData + } + + private object CachedFetchedData { +def empty(): CachedFetchedData = { + val emptyData = FetchedData( +ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], +UNKNOWN_OFFSET, +UNKNOWN_OFFSET) + + CachedFetchedData(emptyData) +} + } + + private type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + + private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty + + /** Retrieve internal cache. This method is only for testing. */ + private[kafka010] def getCache: mutable.Map[CacheKey, CachedFetchedDataList] = cache + + private val (minEvictableIdleTimeMillis, evictorThreadRunIntervalMillis): (Long, Long) = { +val conf = SparkEnv.get.conf + +val minEvictIdleTime = conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS, + DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS) + +val evictorThreadInterval = conf.getLong( + CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS, + DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS) + +(minEvictIdleTime, evictorThreadInterval) + } + + private val executorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +"kafka-fetched-data--cache-evictor") + + private def startEvictorThread(): Unit = { +executorService.scheduleAtFixedRate(new Runnable() { + override def run(): Unit = { +removeIdleFetchedData() --- End diff -- Nice catch! Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214917284 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer( } } - /** Create a new consumer and reset cached states */ - private def resetConsumer(): Unit = { -consumer.close() -consumer = createConsumer -fetchedData.reset() + /** + * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be + * empty if the Kafka consumer fetches some messages but all of them are not visible messages + * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { +val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs) +fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) + } + + private def ensureConsumerAvailable(): Unit = { +if (consumer == null) { --- End diff -- This is defined as `var` so just to avoid additional wrapping here. Same here as above: if we prefer Option I'm happy to change but not sure about it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214910482 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.ConsumerRecord + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.util.ThreadUtils + +/** + * Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]]. + * + * Along with CacheKey, it receives desired start offset to find cached FetchedData which + * may be stored from previous batch. If it can't find one to match, it will create + * a new FetchedData. + */ +private[kafka010] class FetchedDataPool { + import FetchedDataPool._ + + private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +var lastReleasedTimestamp: Long = Long.MaxValue +var lastAcquiredTimestamp: Long = Long.MinValue +var inUse: Boolean = false + +def getObject: FetchedData = fetchedData + } + + private object CachedFetchedData { +def empty(): CachedFetchedData = { + val emptyData = FetchedData( +ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], +UNKNOWN_OFFSET, +UNKNOWN_OFFSET) + + CachedFetchedData(emptyData) +} + } + + private type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + + private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty + + /** Retrieve internal cache. This method is only for testing. */ + private[kafka010] def getCache: mutable.Map[CacheKey, CachedFetchedDataList] = cache + + private val (minEvictableIdleTimeMillis, evictorThreadRunIntervalMillis): (Long, Long) = { +val conf = SparkEnv.get.conf + +val minEvictIdleTime = conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS, + DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS) + +val evictorThreadInterval = conf.getLong( + CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS, + DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS) + +(minEvictIdleTime, evictorThreadInterval) + } + + private val executorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +"kafka-fetched-data--cache-evictor") --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214910433 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.ConsumerRecord + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.util.ThreadUtils + +/** + * Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]]. + * + * Along with CacheKey, it receives desired start offset to find cached FetchedData which + * may be stored from previous batch. If it can't find one to match, it will create + * a new FetchedData. + */ +private[kafka010] class FetchedDataPool { + import FetchedDataPool._ + + private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +var lastReleasedTimestamp: Long = Long.MaxValue +var lastAcquiredTimestamp: Long = Long.MinValue +var inUse: Boolean = false + +def getObject: FetchedData = fetchedData + } + + private object CachedFetchedData { +def empty(): CachedFetchedData = { + val emptyData = FetchedData( +ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], +UNKNOWN_OFFSET, +UNKNOWN_OFFSET) + + CachedFetchedData(emptyData) +} + } + + private type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + + private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty + + /** Retrieve internal cache. This method is only for testing. */ + private[kafka010] def getCache: mutable.Map[CacheKey, CachedFetchedDataList] = cache + + private val (minEvictableIdleTimeMillis, evictorThreadRunIntervalMillis): (Long, Long) = { +val conf = SparkEnv.get.conf + +val minEvictIdleTime = conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS, + DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS) + +val evictorThreadInterval = conf.getLong( + CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS, + DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS) + +(minEvictIdleTime, evictorThreadInterval) + } + + private val executorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +"kafka-fetched-data--cache-evictor") + + private def startEvictorThread(): Unit = { +executorService.scheduleAtFixedRate(new Runnable() { --- End diff -- Will address. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214907878 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * borrowObject will fail with [[IllegalStateException]], but re
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214908731 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/InternalKafkaConsumerPool.scala --- @@ -0,0 +1,241 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.ConcurrentHashMap + +import org.apache.commons.pool2.{BaseKeyedPooledObjectFactory, PooledObject, SwallowedExceptionListener} +import org.apache.commons.pool2.impl.{DefaultEvictionPolicy, DefaultPooledObject, GenericKeyedObjectPool, GenericKeyedObjectPoolConfig} + +import org.apache.spark.SparkEnv +import org.apache.spark.internal.Logging +import org.apache.spark.sql.kafka010.InternalKafkaConsumerPool._ +import org.apache.spark.sql.kafka010.KafkaDataConsumer.CacheKey + +/** + * Provides object pool for [[InternalKafkaConsumer]] which is grouped by [[CacheKey]]. + * + * This class leverages [[GenericKeyedObjectPool]] internally, hence providing methods based on + * the class, and same contract applies: after using the borrowed object, you must either call + * returnObject() if the object is healthy to return to pool, or invalidateObject() if the object + * should be destroyed. + * + * The soft capacity of pool is determined by "spark.sql.kafkaConsumerCache.capacity" config value, + * and the pool will have reasonable default value if the value is not provided. + * (The instance will do its best effort to respect soft capacity but it can exceed when there's + * a borrowing request and there's neither free space nor idle object to clear.) + * + * This class guarantees that no caller will get pooled object once the object is borrowed and + * not yet returned, hence provide thread-safety usage of non-thread-safe [[InternalKafkaConsumer]] + * unless caller shares the object to multiple threads. + */ +private[kafka010] class InternalKafkaConsumerPool( +objectFactory: ObjectFactory, +poolConfig: PoolConfig) { + + // the class is intended to have only soft capacity + assert(poolConfig.getMaxTotal < 0) + + private lazy val pool = { +val internalPool = new GenericKeyedObjectPool[CacheKey, InternalKafkaConsumer]( + objectFactory, poolConfig) + internalPool.setSwallowedExceptionListener(CustomSwallowedExceptionListener) +internalPool + } + + /** + * Borrows [[InternalKafkaConsumer]] object from the pool. If there's no idle object for the key, + * the pool will create the [[InternalKafkaConsumer]] object. + * + * If the pool doesn't have idle object for the key and also exceeds the soft capacity, + * pool will try to clear some of idle objects. + * + * Borrowed object must be returned by either calling returnObject or invalidateObject, otherwise + * the object will be kept in pool as active object. + */ + def borrowObject(key: CacheKey, kafkaParams: ju.Map[String, Object]): InternalKafkaConsumer = { +updateKafkaParamForKey(key, kafkaParams) + +if (getTotal == poolConfig.getSoftMaxTotal()) { + pool.clearOldest() +} + +pool.borrowObject(key) + } + + /** Returns borrowed object to the pool. */ + def returnObject(consumer: InternalKafkaConsumer): Unit = { +pool.returnObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates (destroy) borrowed object to the pool. */ + def invalidateObject(consumer: InternalKafkaConsumer): Unit = { +pool.invalidateObject(extractCacheKey(consumer), consumer) + } + + /** Invalidates all idle consumers for the key */ + def invalidateKey(key: CacheKey): Unit = { +pool.clear(key) + } + + /** + * Closes the keyed object pool. Once the pool is closed, + * borrowObject will fail with [[IllegalStateException]], but re
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214917336 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaDataConsumer.scala --- @@ -414,17 +468,37 @@ private[kafka010] case class InternalKafkaConsumer( } } - /** Create a new consumer and reset cached states */ - private def resetConsumer(): Unit = { -consumer.close() -consumer = createConsumer -fetchedData.reset() + /** + * Poll messages from Kafka starting from `offset` and update `fetchedData`. `fetchedData` may be + * empty if the Kafka consumer fetches some messages but all of them are not visible messages + * (either transaction messages, or aborted messages when `isolation.level` is `read_committed`). + * + * @throws OffsetOutOfRangeException if `offset` is out of range. + * @throws TimeoutException if the consumer position is not changed after polling. It means the + * consumer polls nothing before timeout. + */ + private def fetchData(offset: Long, pollTimeoutMs: Long): Unit = { +val (records, offsetAfterPoll) = consumer.fetch(offset, pollTimeoutMs) +fetchedData.withNewPoll(records.listIterator, offsetAfterPoll) + } + + private def ensureConsumerAvailable(): Unit = { +if (consumer == null) { + consumer = consumerPool.borrowObject(cacheKey, kafkaParams) +} + } + + private def ensureFetchedDataAvailable(offset: Long): Unit = { +if (fetchedData == null) { --- End diff -- Same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22138: [SPARK-25151][SS] Apply Apache Commons Pool to Ka...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/22138#discussion_r214909826 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/FetchedDataPool.scala --- @@ -0,0 +1,160 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.{util => ju} +import java.util.concurrent.TimeUnit + +import scala.collection.mutable + +import org.apache.kafka.clients.consumer.ConsumerRecord + +import org.apache.spark.SparkEnv +import org.apache.spark.sql.kafka010.KafkaDataConsumer.{CacheKey, UNKNOWN_OFFSET} +import org.apache.spark.util.ThreadUtils + +/** + * Provides object pool for [[FetchedData]] which is grouped by [[CacheKey]]. + * + * Along with CacheKey, it receives desired start offset to find cached FetchedData which + * may be stored from previous batch. If it can't find one to match, it will create + * a new FetchedData. + */ +private[kafka010] class FetchedDataPool { + import FetchedDataPool._ + + private[kafka010] case class CachedFetchedData(fetchedData: FetchedData) { +var lastReleasedTimestamp: Long = Long.MaxValue +var lastAcquiredTimestamp: Long = Long.MinValue +var inUse: Boolean = false + +def getObject: FetchedData = fetchedData + } + + private object CachedFetchedData { +def empty(): CachedFetchedData = { + val emptyData = FetchedData( +ju.Collections.emptyListIterator[ConsumerRecord[Array[Byte], Array[Byte]]], +UNKNOWN_OFFSET, +UNKNOWN_OFFSET) + + CachedFetchedData(emptyData) +} + } + + private type CachedFetchedDataList = mutable.ListBuffer[CachedFetchedData] + + private val cache: mutable.Map[CacheKey, CachedFetchedDataList] = mutable.HashMap.empty + + /** Retrieve internal cache. This method is only for testing. */ + private[kafka010] def getCache: mutable.Map[CacheKey, CachedFetchedDataList] = cache + + private val (minEvictableIdleTimeMillis, evictorThreadRunIntervalMillis): (Long, Long) = { +val conf = SparkEnv.get.conf + +val minEvictIdleTime = conf.getLong(CONFIG_NAME_MIN_EVICTABLE_IDLE_TIME_MILLIS, + DEFAULT_VALUE_MIN_EVICTABLE_IDLE_TIME_MILLIS) + +val evictorThreadInterval = conf.getLong( + CONFIG_NAME_EVICTOR_THREAD_RUN_INTERVAL_MILLIS, + DEFAULT_VALUE_EVICTOR_THREAD_RUN_INTERVAL_MILLIS) + +(minEvictIdleTime, evictorThreadInterval) + } + + private val executorService = ThreadUtils.newDaemonSingleThreadScheduledExecutor( +"kafka-fetched-data--cache-evictor") + + private def startEvictorThread(): Unit = { +executorService.scheduleAtFixedRate(new Runnable() { + override def run(): Unit = { +removeIdleFetchedData() + } +}, 0, evictorThreadRunIntervalMillis, TimeUnit.MILLISECONDS) + } + + startEvictorThread() + + def acquire(key: CacheKey, desiredStartOffset: Long): FetchedData = synchronized { +val fetchedDataList = cache.getOrElseUpdate(key, new CachedFetchedDataList()) + +val cachedFetchedDataOption = fetchedDataList.find { p => + !p.inUse && p.getObject.nextOffsetInFetchedData == desiredStartOffset +} + +var cachedFetchedData: CachedFetchedData = null +if (cachedFetchedDataOption.isDefined) { + cachedFetchedData = cachedFetchedDataOption.get +} else { + cachedFetchedData = CachedFetchedData.empty() + fetchedDataList += cachedFetchedData +} + +cachedFetchedData.lastAcquiredTimestamp = System.currentTimeMillis() +cachedFetchedData.inUse = true + +cachedFetchedData.getObject + } + + def invalidate(key: CacheKey): Unit = synchronized { +cache.remove(key) + } + + def release(key: CacheKey, fetchedData: FetchedData): Unit = s
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22328 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95668/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22277: [SPARK-25276] Redundant constrains when using alias
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22277 Thank you for interest in this issue, however, I don't think the changes proposed in this PR is valid, consider you have another predicate like `a > z`, it is surely desired to infer a new constraint `z > z`. Please correct me if I'm wrong about this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22328 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22328: [SPARK-22666][ML][SQL] Spark datasource for image format
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22328 **[Test build #95668 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95668/testReport)** for PR 22328 at commit [`5164d19`](https://github.com/apache/spark/commit/5164d19deb447e49e87d20f5a1efbb0c2ee177ee). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22171 **[Test build #95675 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95675/testReport)** for PR 22171 at commit [`6b4c2f2`](https://github.com/apache/spark/commit/6b4c2f24c500bb972b5ffd14897bfb5fb2184ffc). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22171 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22171 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2839/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22171: [SPARK-25177][SQL] When dataframe decimal type column ha...
Github user vinodkc commented on the issue: https://github.com/apache/spark/pull/22171 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22219: [SPARK-25224][SQL] Improvement of Spark SQL Thrif...
Github user Dooyoung-Hwang commented on a diff in the pull request: https://github.com/apache/spark/pull/22219#discussion_r214908763 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala --- @@ -3237,6 +3238,28 @@ class Dataset[T] private[sql]( files.toSet.toArray } + /** + * Returns the tuple of the row count and an SeqView that contains all rows in this Dataset. + * + * The SeqView will consume as much memory as the total size of serialized results which can be + * limited with the config 'spark.driver.maxResultSize'. Rows are deserialized when iterating rows + * with iterator of returned SeqView. Whether to collect all deserialized rows or to iterate them + * incrementally can be decided with considering total rows count and driver memory. + */ + private[sql] def collectCountAndSeqView(): (Long, SeqView[T, Array[T]]) = +withAction("collectCountAndSeqView", queryExecution) { plan => + // This projection writes output to a `InternalRow`, which means applying this projection is + // not thread-safe. Here we create the projection inside this method to make `Dataset` + // thread-safe. + val objProj = GenerateSafeProjection.generate(deserializer :: Nil) + val (totalRowCount, internalRowsView) = plan.executeCollectSeqView() + (totalRowCount, internalRowsView.map { row => +// The row returned by SafeProjection is `SpecificInternalRow`, which ignore the data type +// parameter of its `get` method, so it's safe to use null here. +objProj(row).get(0, null).asInstanceOf[T] + }.asInstanceOf[SeqView[T, Array[T]]]) +} --- End diff -- Yes, that's what I mean. I thought that 'deserializer' is declared with private, so there is now way to get 'deserializer' out of Dataset. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22327 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22327 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95664/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22327 **[Test build #95664 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95664/testReport)** for PR 22327 at commit [`f89448b`](https://github.com/apache/spark/commit/f89448b7b0598a59f750a324e869e7768cfedbc1). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/22330 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/22330#discussion_r214903740 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/TakeOrderedAndProjectSuite.scala --- @@ -45,6 +45,7 @@ class TakeOrderedAndProjectSuite extends SparkPlanTest with SharedSQLContext { protected override def afterAll() = { SQLConf.get.setConf(SQLConf.LIMIT_FLAT_GLOBAL_LIMIT, originalLimitFlatGlobalLimit) +super.afterAll() --- End diff -- Good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22313 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22313 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95669/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22313 **[Test build #95669 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95669/testReport)** for PR 22313 at commit [`3cd4443`](https://github.com/apache/spark/commit/3cd444306c3b8b6c42a74b7cfb0755b8ce209c84). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22141: [SPARK-25154][SQL] Support NOT IN sub-queries inside nes...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/22141 @dilipbiswal Any update? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19691: [SPARK-14922][SPARK-17732][SQL]ALTER TABLE DROP PARTITIO...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19691 @DazhuangSu still busy? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/22240 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22048: [SPARK-25108][SQL] Fix the show method to display the wi...
Github user xuejianbest commented on the issue: https://github.com/apache/spark/pull/22048 I see, Thinks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22316: [SPARK-25048][SQL] Pivoting by multiple columns i...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/22316#discussion_r214894498 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/DataFramePivotSuite.scala --- @@ -308,4 +308,27 @@ class DataFramePivotSuite extends QueryTest with SharedSQLContext { assert(exception.getMessage.contains("aggregate functions are not allowed")) } + + test("pivoting column list with values") { +val expected = Row(2012, 1.0, null) :: Row(2013, 48000.0, 3.0) :: Nil +val df = trainingSales + .groupBy($"sales.year") + .pivot(struct(lower($"sales.course"), $"training"), Seq( +struct(lit("dotnet"), lit("Experts")), +struct(lit("java"), lit("Dummies"))) + ).agg(sum($"sales.earnings")) + +checkAnswer(df, expected) + } + + test("pivoting column list") { +val exception = intercept[RuntimeException] { + trainingSales +.groupBy($"sales.year") +.pivot(struct(lower($"sales.course"), $"training")) +.agg(sum($"sales.earnings")) +.collect() --- End diff -- I think invalid queries basically throw `AnalysisException. But, yea, indeed, we'd better to keep the current behaivour. Thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22330 **[Test build #95674 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95674/testReport)** for PR 22330 at commit [`0b01066`](https://github.com/apache/spark/commit/0b010669b15781a648f7c7bde13556ddb7c003c3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22330 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2838/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle Spar...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22330 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22330: [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recyc...
GitHub user jiangxb1987 opened a pull request: https://github.com/apache/spark/pull/22330 [SPARK-19355][SQL][FOLLOWUP][TEST] Properly recycle SparkSession on TakeOrderedAndProjectSuite finishes ## What changes were proposed in this pull request? Previously in `TakeOrderedAndProjectSuite` the SparkSession will not get recycled when the test suite finishes. ## How was this patch tested? N/A You can merge this pull request into a Git repository by running: $ git pull https://github.com/jiangxb1987/spark SPARK-19355 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22330.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22330 commit 0b010669b15781a648f7c7bde13556ddb7c003c3 Author: Xingbo Jiang Date: 2018-09-04T12:23:30Z properly recycle SparkSession on TakeOrderedAndProjectSuite finishes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22320 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22320 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95663/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22320: [SPARK-25313][SQL]Fix regression in FileFormatWriter out...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22320 **[Test build #95663 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95663/testReport)** for PR 22320 at commit [`3ca072d`](https://github.com/apache/spark/commit/3ca072d18474d1536c3ac729fe1e0b79cd855cca). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22240 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22240 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95661/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22240: [SPARK-25248] [CORE] Audit barrier Scala APIs for 2.4
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22240 **[Test build #95661 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95661/testReport)** for PR 22240 at commit [`9b6a47b`](https://github.com/apache/spark/commit/9b6a47bf718309eb0b5a22a0282a5a7c4226e991). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22179 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22179 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2837/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22179 **[Test build #95673 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95673/testReport)** for PR 22179 at commit [`d1dac1e`](https://github.com/apache/spark/commit/d1dac1e75d450ca26a460cb36f8ac0684a251c64). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22179: [SPARK-23131][BUILD] Upgrade Kryo to 4.0.2
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/22179 Thanks, @dongjoon-hyun --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22321: [DOC] Update some outdated links
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22321 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22321: [DOC] Update some outdated links
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/22321 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22326 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95662/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22282 **[Test build #95672 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95672/testReport)** for PR 22282 at commit [`253a894`](https://github.com/apache/spark/commit/253a894bedbd3a9642e529ad937dcb99dae346c7). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22326 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22326: [SPARK-25314][SQL] Fix Python UDF accessing attibutes fr...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22326 **[Test build #95662 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95662/testReport)** for PR 22326 at commit [`d58f3a5`](https://github.com/apache/spark/commit/d58f3a58f5be3efaab7e56b353ce0dfeb7610f7d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22321: [DOC] Update some outdated links
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22321 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/95656/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22321: [DOC] Update some outdated links
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22321 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22321: [DOC] Update some outdated links
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22321 **[Test build #95656 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95656/testReport)** for PR 22321 at commit [`d9bbf3c`](https://github.com/apache/spark/commit/d9bbf3c4a7be82d66eb643a42c2724cd30ea1ad5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22226: [SPARK-25252][SQL] Support arrays of any types by to_jso...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/6 **[Test build #95671 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95671/testReport)** for PR 6 at commit [`90c9687`](https://github.com/apache/spark/commit/90c968772d74c8aadc7a4d0e74e226554b921486). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22282: [SPARK-23539][SS] Add support for Kafka headers in Struc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22282 **[Test build #95670 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95670/testReport)** for PR 22282 at commit [`2254009`](https://github.com/apache/spark/commit/22540092f7a786585afeb5a861b1c329722e3d0b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22226: [SPARK-25252][SQL] Support arrays of any types by...
Github user MaxGekk commented on a diff in the pull request: https://github.com/apache/spark/pull/6#discussion_r214878373 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/jsonExpressions.scala --- @@ -663,12 +662,7 @@ case class StructsToJson( rowSchema, writer, new JSONOptions(options, timeZoneId.get)) @transient - lazy val rowSchema = child.dataType match { -case st: StructType => st -case ArrayType(st: StructType, _) => st -case mt: MapType => mt -case ArrayType(mt: MapType, _) => mt - } + lazy val rowSchema = child.dataType --- End diff -- I tried to remove `lazy` and got many errors on tests like: ``` Invalid call to dataType on unresolved object, tree: 'a org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to dataType on unresolved object, tree: 'a at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.dataType(unresolved.scala:105) at org.apache.spark.sql.catalyst.expressions.StructsToJson.(jsonExpressions.scala:665) ``` If you don't mind, I will keep it `lazy`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22327: [SPARK-25330][BUILD] Revert Hadoop 2.7 to 2.7.3
Github user wangyum commented on the issue: https://github.com/apache/spark/pull/22327 Yes. This is a Hadoop thing. I try to build Hadoop 2.7.7 with [`Configuration.getRestrictParserDefault(Object resource)`](https://github.com/apache/hadoop/blob/release-2.7.7-RC0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/conf/Configuration.java#L236) = true and false. It succeeded when `Configuration.getRestrictParserDefault(Object resource)=false`, but failed when `Configuration.getRestrictParserDefault(Object resource)=true`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22321: [DOC] Update some outdated links
Github user dongjoon-hyun commented on the issue: https://github.com/apache/spark/pull/22321 I see. I'll try after Jenkins passes. Thank you, @HyukjinKwon . --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/22313 **[Test build #95669 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/95669/testReport)** for PR 22313 at commit [`3cd4443`](https://github.com/apache/spark/commit/3cd444306c3b8b6c42a74b7cfb0755b8ce209c84). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22313 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/2836/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22313: [SPARK-25306][SQL] Avoid skewed filter trees to speed up...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22313 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org