[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21895 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21895 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94268/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21895 **[Test build #94268 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94268/testReport)** for PR 21895 at commit [`14ae790`](https://github.com/apache/spark/commit/14ae790350b88c9ed64d63bd67cf21a22f8ffdd9). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21988: [SPARK-25003][PYSPARK][BRANCH-2.2] Use SessionExtensions...
Github user RussellSpitzer commented on the issue: https://github.com/apache/spark/pull/21988 @felixcheung I just didn't know what version to target so I made a a PR for each one. We can just close the ones that shouldn't be merged. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21989: [SPARK-25003][PYSPARK][BRANCH-2.3] Use SessionExtensions...
Github user RussellSpitzer commented on the issue: https://github.com/apache/spark/pull/21989 @kiszk sure, it all depends which branch the merge target should be I wasn't sure which one was being used for changes of this nature. Technically it's a bug fix I believe. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22004: [WIP][SPARK-25029][TESTS] Scala 2.12 issues: Task...
Github user adriaanm commented on a diff in the pull request: https://github.com/apache/spark/pull/22004#discussion_r207871809 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2369,39 +2369,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + + test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages " + "still behave correctly on fetch failures") { -// Runs a job that always encounters a fetch failure, so should eventually be aborted --- End diff -- I assume "bytecode stuff" > Waiting for feedback from @lrytz on the bytecode stuff. I assume you mean the janino issue? I commented on https://github.com/janino-compiler/janino/issues/47 -- seems like they don't fully support Java 8 yet. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22004: [WIP][SPARK-25029][TESTS] Scala 2.12 issues: TaskNotSeri...
Github user skonto commented on the issue: https://github.com/apache/spark/pull/22004 @srowen I was able to reproduce and investigate this further but cleaning in 2.11 does not seem to affect serialization in this case. I moved the serialization check at the beginning of the clean method in the ClosureCleaner and run with 2.11 and it passes. Waiting for feedback from @lrytz on the bytecode stuff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/22006#discussion_r207869680 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/types/MapTypeSuite.scala --- @@ -0,0 +1,47 @@ +/* --- End diff -- I think we can put this in `DataTypeSuite` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22004: [WIP][SPARK-25029][TESTS] Scala 2.12 issues: Task...
Github user skonto commented on a diff in the pull request: https://github.com/apache/spark/pull/22004#discussion_r207868702 --- Diff: core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala --- @@ -2369,39 +2369,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi assert(scheduler.getShuffleDependencies(rddE) === Set(shuffleDepA, shuffleDepC)) } - test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages" + + test("SPARK-17644: After one stage is aborted for too many failed attempts, subsequent stages " + "still behave correctly on fetch failures") { -// Runs a job that always encounters a fetch failure, so should eventually be aborted --- End diff -- I was able to reproduce and investigate this further but cleaning in 2.11 does not affect serialization in this case. I moved the check at the beginning of the clean method in the ClosureCleaner. Waiting for feedback from @lrytz on the bytecode stuff. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user heary-cao commented on the issue: https://github.com/apache/spark/pull/21931 @kiszk ,I'm not sure how much the maximum is set, and the size of 1G is the maximum value accepted by numBuckets. Of course, buckets is the memory of 8G. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21970 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21970 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94271/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21970 **[Test build #94271 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94271/testReport)** for PR 21970 at commit [`9103b9c`](https://github.com/apache/spark/commit/9103b9cc488e032f1be452f9ee5964c7aa7b244c). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21860 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94270/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21860 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21860 **[Test build #94270 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94270/testReport)** for PR 21860 at commit [`dfa549e`](https://github.com/apache/spark/commit/dfa549e4ed782ddfff6df8fdb861d04a7465b0ba). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22006 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22006 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22006: [SPARK-25031][SQL] Fix MapType schema print
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22006 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22006: [SPARK-25031][SQL] Fix MapType schema print
GitHub user invkrh opened a pull request: https://github.com/apache/spark/pull/22006 [SPARK-25031][SQL] Fix MapType schema print ## What changes were proposed in this pull request? The PR fix the bug in `buildFormattedString` function in `MapType`, which makes the printed schema misleading. ## How was this patch tested? Added UT You can merge this pull request into a Git repository by running: $ git pull https://github.com/invkrh/spark fix-map-schema-print Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22006.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22006 commit f1a7e29837c39f626b260af3d62d74be9d0e802f Author: invkrh Date: 2018-08-06T11:24:18Z Fix MapType schema print --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19635: [SPARK-22413][SQL] Type coercion for IN is not coherent ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19635 kindy ping @gatorsmile @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21184: [WIP][SPARK-24051][SQL] Replace Aliases with the ...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21184#discussion_r207859370 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala --- @@ -284,6 +288,80 @@ class Analyzer( } } + /** + * Replaces [[Alias]] with the same exprId but different references with [[Alias]] having + * different exprIds. This is a rare situation which can cause incorrect results. + */ + object DeduplicateAliases extends Rule[LogicalPlan] { --- End diff -- kindly ping @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21102 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94267/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21102 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21403 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21222: [SPARK-24161][SS] Enable debug package feature on...
Github user attilapiros commented on a diff in the pull request: https://github.com/apache/spark/pull/21222#discussion_r207857933 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala --- @@ -513,6 +515,125 @@ class StreamSuite extends StreamTest { } } + test("explain-continuous") { +val inputData = ContinuousMemoryStream[Int] +val df = inputData.toDS().map(_ * 2).filter(_ > 5) + +// Test `df.explain` +val explain = ExplainCommand(df.queryExecution.logical, extended = false) +val explainString = + spark.sessionState +.executePlan(explain) +.executedPlan +.executeCollect() +.map(_.getString(0)) +.mkString("\n") +assert(explainString.contains("Filter")) +assert(explainString.contains("MapElements")) +assert(!explainString.contains("LocalTableScan")) + +// Test StreamingQuery.display +val q = df.writeStream.queryName("memory_continuous_explain") + .outputMode(OutputMode.Update()).format("memory") + .trigger(Trigger.Continuous("1 seconds")) + .start() + .asInstanceOf[StreamingQueryWrapper] + .streamingQuery +try { + // in continuous mode, the query will be run even there's no data + // sleep a bit to ensure initialization + eventually(timeout(2.seconds), interval(100.milliseconds)) { +assert(q.lastExecution != null) + } + + val explainWithoutExtended = q.explainInternal(false) + + // `extended = false` only displays the physical plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithoutExtended).size === 0) + assert("ScanV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithoutExtended).size === 1) + + val explainWithExtended = q.explainInternal(true) + // `extended = true` displays 3 logical plans (Parsed/Optimized/Optimized) and 1 physical + // plan. + assert("Streaming RelationV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithExtended).size === 3) + assert("ScanV2 ContinuousMemoryStream".r +.findAllMatchIn(explainWithExtended).size === 1) +} finally { + q.stop() +} + } + + test("codegen-microbatch") { +import org.apache.spark.sql.execution.debug._ + +val inputData = MemoryStream[Int] +val df = inputData.toDS().map(_ * 2).filter(_ > 5) + +// Test StreamingQuery.codegen +val q = df.writeStream.queryName("memory_microbatch_codegen") + .outputMode(OutputMode.Update) + .format("memory") + .trigger(Trigger.ProcessingTime("1 seconds")) + .start() + +try { + assert("No physical plan. Waiting for data." === codegenString(q)) + assert(codegenStringSeq(q).isEmpty) + + inputData.addData(1, 2, 3, 4, 5) + q.processAllAvailable() + + val codegenStr = codegenString(q) --- End diff -- What about to extract the following 8 lines into a private method as the `codegenString` and `codegenStringSeq` result checking are the same for microbatch and continues executions? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21102: [SPARK-23913][SQL] Add array_intersect function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21102 **[Test build #94267 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94267/testReport)** for PR 21102 at commit [`33781b6`](https://github.com/apache/spark/commit/33781b640ed447d9a73a93b63e1834dd9360e72a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94269/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21403 **[Test build #94269 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94269/testReport)** for PR 21403 at commit [`eb1dfb7`](https://github.com/apache/spark/commit/eb1dfb7e0873b8479ea54d223b7fde3dcefa4834). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21939 It sounds like the vote can pass soon. https://lists.apache.org/thread.html/9900da1540be5aafce27691fd40395bb53f465302db29979c154d99a@%3Cdev.arrow.apache.org%3E --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user heary-cao commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r207854432 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -232,6 +232,23 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24901 check merge FastHashMap and RegularHashMap generate code max size") { +val caseNumber = 80 +// merge fastHashMap and regularHashMap generate code max size +val codeWithLongFunctions = genGroupByCode(caseNumber) +val (_, maxCodeSize) = CodeGenerator.compile(codeWithLongFunctions) +assert(maxCodeSize < 13500) --- End diff -- yes, the current maxCodeSize: 13370 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21939 To get this in, we might need to delay the code freeze. Can you reply the dev list email http://apache-spark-developers-list.1001551.n3.nabble.com/code-freeze-and-branch-cut-for-Apache-Spark-2-4-td24365.html ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21939: [SPARK-23874][SQL][PYTHON] Upgrade Apache Arrow to 0.10....
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21939 After the code freeze, the dependency changes are not allowed. Hopefully, we can make it before that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22000: [SPARK-25025][SQL] Remove the default value of is...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22000 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21909: [SPARK-24959][SQL] Speed up count() for JSON and ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21909#discussion_r207850329 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala --- @@ -2225,19 +2225,21 @@ class JsonSuite extends QueryTest with SharedSQLContext with TestJsonData { test("SPARK-23723: specified encoding is not matched to actual encoding") { -val fileName = "test-data/utf16LE.json" -val schema = new StructType().add("firstName", StringType).add("lastName", StringType) -val exception = intercept[SparkException] { - spark.read.schema(schema) -.option("mode", "FAILFAST") -.option("multiline", "true") -.options(Map("encoding" -> "UTF-16BE")) -.json(testFile(fileName)) -.count() -} -val errMsg = exception.getMessage +withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> "false") { --- End diff -- How about CSV? Could you add the same one too? Also, we need to add the verification logic when the conf is true. ``` Seq(true, false).foreach { optimizeEmptySchema => withSQLConf(SQLConf.BYPASS_PARSER_FOR_EMPTY_SCHEMA.key -> optimizeEmptySchema.toString) { ... } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21970 LGTM pending Jenkins --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21998: [SPARK-24940][SQL] Use IntegerLiteral in ResolveC...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21998 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21986 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207846166 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- Add a comment --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207845712 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- Add a comment above this line? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21898: [SPARK-24817][Core] Implement BarrierTaskContext.barrier...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21898 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94273/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21986 **[Test build #94273 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94273/testReport)** for PR 21986 at commit [`37e221c`](https://github.com/apache/spark/commit/37e221c2eb79ec43e61ed2b4a61f206100eaeb42). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207834660 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -496,6 +496,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp executorDataMap.keySet.toSeq } + override def getNumSlots(): Int = { +executorDataMap.values.foldLeft(0) { (num, executor) => + num + executor.totalCores / scheduler.CPUS_PER_TASK --- End diff -- As mentioned in the method description of `SchedulerBackend.getNumSlots()`: ``` * Note that please don't cache the value returned by this method, because the number can change * due to add/remove executors. ``` It shall be fine to cache that within different stages of a job, but it requires a few more changes that will make the current PR more complicated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22001: [SPARK-24819][CORE] Fail fast when no enough slot...
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/22001#discussion_r207833047 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosFineGrainedSchedulerBackend.scala --- @@ -453,4 +453,8 @@ private[spark] class MesosFineGrainedSchedulerBackend( super.applicationId } + override def getNumSlots(): Int = { +// TODO support this method for MesosFineGrainedSchedulerBackend --- End diff -- Only `MesosFineGrainedSchedulerBackend` shall break, we still support `MesosCoarseGrainedSchedulerBackend` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207827294 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- I opened https://issues.apache.org/jira/browse/SPARK-25030 to track the issue. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user ferdonline commented on the issue: https://github.com/apache/spark/pull/21087 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207823603 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- This is certainly a potential bug in `SparkSubmit` and not related to the changes made in this PR, I don't feel it shall block this PR. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207822218 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1930,6 +1930,12 @@ class SparkContext(config: SparkConf) extends Logging { Utils.tryLogNonFatalError { _executorAllocationManager.foreach(_.stop()) } +if (_dagScheduler != null) { --- End diff -- This is to fix https://github.com/apache/spark/pull/21898#issuecomment-410499090 , previously LiveListenerBus was stopped before we stop DAGScheduler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user jiangxb1987 commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207821774 --- Diff: core/src/main/scala/org/apache/spark/BarrierTaskContext.scala --- @@ -39,6 +44,22 @@ class BarrierTaskContext( extends TaskContextImpl(stageId, stageAttemptNumber, partitionId, taskAttemptId, attemptNumber, taskMemoryManager, localProperties, metricsSystem, taskMetrics) { + // Find the driver side RPCEndpointRef of the coordinator that handles all the barrier() calls. + private val barrierCoordinator: RpcEndpointRef = { +val env = SparkEnv.get +RpcUtils.makeDriverRef("barrierSync", env.conf, env.rpcEnv) + } + + private val timer = new Timer("Barrier task timer for barrier() calls.") + + // Local barrierEpoch that identify a barrier() call from current task, it shall be identical + // with the driver side epoch. + private var barrierEpoch = 0 + + // Number of tasks of the current barrier stage, a barrier() call must collect enough requests + // from different tasks within the same barrier stage attempt to succeed. + private lazy val numTasks = getTaskInfos().size --- End diff -- If change it to a `def` then we have to call `getTaskInfos()` every time, the current `lazy val` shall only call `getTaskInfos()` once. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r207819972 --- Diff: core/src/main/scala/org/apache/spark/Partitioner.scala --- @@ -166,7 +170,13 @@ class RangePartitioner[K : Ordering : ClassTag, V]( // Assume the input partitions are roughly balanced and over-sample a little bit. val sampleSizePerPartition = math.ceil(3.0 * sampleSize / rdd.partitions.length).toInt val (numItems, sketched) = RangePartitioner.sketch(rdd.map(_._1), sampleSizePerPartition) - if (numItems == 0L) { + // get the sampled data + sampledArray = sketched.foldLeft(sampledArray)((total, sample) => { --- End diff -- Do we need to always create `sampledArray` and to store into `var`? It may lead to overhead when the execution would go to L182. It would be good to calculate only length here and to create the array at L179. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21087 **[Test build #94274 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94274/testReport)** for PR 21087 at commit [`8ddc4eb`](https://github.com/apache/spark/commit/8ddc4ebed9623d571eb778b56a542f91db43f743). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21087 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/94274/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21087 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22005: [SPARK-16817][CORE][WIP] Use Alluxio to improve stabilit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22005 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22005: [SPARK-16817][CORE][WIP] Use Alluxio to improve stabilit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22005 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22005: [SPARK-16817][CORE][WIP] Use Alluxio to improve s...
GitHub user Chopinxb opened a pull request: https://github.com/apache/spark/pull/22005 [SPARK-16817][CORE][WIP] Use Alluxio to improve stability of shuffle by replication of shuffle data ## What changes were proposed in this pull request? (In the PR, I propose to use Alluxio to help store shuffle data in order to improve the stability of complicated OLAP task. **Motivation** In original ways, when there is a shuffle fetch failure (NodeManager(shuffle service) crashed), spark will rerun previous stage to reproduce shuffle data. This way works well, but in some cases we cannot accept the recalculation price. In this PR, when there is a shuffle fetch failure , reduce will retry fetch shuffle data from Alluxio to avoid recalculation **Usage** 1. Enable this feature in spark-default.conf. `spark.alluxio.shuffle.enabled ture` ## How was this patch tested? manual tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/Chopinxb/spark spark-shuffle-alluxio Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22005.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22005 commit a4371cfaf6672fc67cc2961e23f241dda49314f8 Author: XiaoBang Date: 2018-06-20T00:49:24Z use alluxio to improve stability of shuffle commit 65659882839dc626e86f1d3dd73544eb2c28178b Author: xiaobang213452 Date: 2018-08-06T05:12:31Z update style commit 20cabe1419f6eb382089a6faecede6cb420619d9 Author: xiaobang213452 Date: 2018-08-06T08:17:42Z update style --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22005: [SPARK-16817][CORE][WIP] Use Alluxio to improve stabilit...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/22005 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21087: [SPARK-23997][SQL] Configurable maximum number of bucket...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21087 **[Test build #94274 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94274/testReport)** for PR 21087 at commit [`8ddc4eb`](https://github.com/apache/spark/commit/8ddc4ebed9623d571eb778b56a542f91db43f743). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21859: [SPARK-24900][SQL]Speed up sort when the dataset ...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21859#discussion_r207817068 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/SmallDataSortBenchmark.scala --- @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import java.io.File + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.util.{Benchmark, Utils} + +/** + * The benchmarks aims to measure performance of + * [SPARK-24900][SQL]speed up sort when the dataset is small + */ +object SmallDataSortBenchmark { + + val conf = new SparkConf() + + val spark = SparkSession.builder +.master("local[1]") +.appName("speed up sort when the dataset is small") +.config(conf) +.getOrCreate() + + import spark.implicits._ + + def withTempPath(f: File => Unit): Unit = { +val path = Utils.createTempDir() +path.delete() +try f(path) finally Utils.deleteRecursively(path) + } + + def run(rowsNum: Int): Unit = { +val factor = 1000 +val key = rowsNum / 2 +val benchmark = new Benchmark("speed up sort when the dataset is small", rowsNum * factor) +withTempPath { path => + // scalastyle:off println + benchmark.out.println("Preparing data for benchmarking ...") + // scalastyle:on println + + val list = (0 to factor).toList + + spark.sparkContext.range(0, rowsNum, 1) +.flatMap(num => { + list.map(x => (num, x)) +}) +.toDF("key", "value") +.write +.option("encoding", "UTF-8") +.json(path.getAbsolutePath) + + benchmark.addCase("sort", 10) { _ => +val dataset = spark.read.json(path.getAbsolutePath) +dataset.createOrReplaceTempView("src") +val result = spark. + sql(s"select * from src where key = $key order by value").collectAsList().size() + + } + + benchmark.run() --- End diff -- Would you please add the performance results without and with this PR to this space like other benchmarks? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22002: [FOLLOW-UP][SPARK-23772][SQL] Provide an option t...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/22002 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21989: [SPARK-25003][PYSPARK] Use SessionExtensions in Pyspark
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21989 Is this same as #21990? Would it be possible to close this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207816072 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -210,3 +221,66 @@ case class ArrayTransform( override def prettyName: String = "transform" } + +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( +input: Expression, +function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { +case MapType(kType, vType, vContainsNull) => (kType, vType, vContainsNull) +case _ => + val MapType(kType, vType, vContainsNull) = MapType.defaultConcreteType + (kType, vType, vContainsNull) + } + + @transient lazy val (keyVar, valueVar) = { +val args = function.asInstanceOf[LambdaFunction].arguments +(args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable]) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = { +function match { + case LambdaFunction(_, _, _) => +copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil)) +} + } + + override def nullable: Boolean = input.nullable + + override def eval(input: InternalRow): Any = { +val m = this.input.eval(input).asInstanceOf[MapData] +if (m == null) { + null +} else { + val retKeys = new mutable.ListBuffer[Any] + val retValues = new mutable.ListBuffer[Any] --- End diff -- But I just checked that in `ArrayFilter` you initialized it with the number of incoming elements. So i think there is no difference in terms of performance, as using an upper value for the number of output elements we are sure no copy is performed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21986 **[Test build #94273 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94273/testReport)** for PR 21986 at commit [`37e221c`](https://github.com/apache/spark/commit/37e221c2eb79ec43e61ed2b4a61f206100eaeb42). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1839/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21087: [SPARK-23997][SQL] Configurable maximum number of...
Github user ferdonline commented on a diff in the pull request: https://github.com/apache/spark/pull/21087#discussion_r207815199 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -580,6 +580,11 @@ object SQLConf { .booleanConf .createWithDefault(true) + val BUCKETING_MAX_BUCKETS = buildConf("spark.sql.bucketing.maxBuckets") +.doc("The maximum number of buckets allowed. Defaults to 10") +.longConf --- End diff -- I was following the convention used in config entries, where integrals use longConf, without making further changes. However I agree we could update the class type as well to match. Will submit the patch. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #22002: [FOLLOW-UP][SPARK-23772][SQL] Provide an option to ignor...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/22002 Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21898: [SPARK-24817][Core] Implement BarrierTaskContext....
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21898#discussion_r207814955 --- Diff: core/src/main/scala/org/apache/spark/BarrierCoordinator.scala --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.util.{Timer, TimerTask} +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Consumer + +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.internal.Logging +import org.apache.spark.rpc.{RpcCallContext, RpcEnv, ThreadSafeRpcEndpoint} +import org.apache.spark.scheduler.{LiveListenerBus, SparkListener, SparkListenerStageCompleted} + +/** + * For each barrier stage attempt, only at most one barrier() call can be active at any time, thus + * we can use (stageId, stageAttemptId) to identify the stage attempt where the barrier() call is + * from. + */ +private case class ContextBarrierId(stageId: Int, stageAttemptId: Int) { + override def toString: String = s"Stage $stageId (Attempt $stageAttemptId)" +} + +/** + * A coordinator that handles all global sync requests from BarrierTaskContext. Each global sync + * request is generated by `BarrierTaskContext.barrier()`, and identified by + * stageId + stageAttemptId + barrierEpoch. Reply all the blocking global sync requests upon + * all the requests for a group of `barrier()` calls are received. If the coordinator is unable to + * collect enough global sync requests within a configured time, fail all the requests and return + * an Exception with timeout message. + */ +private[spark] class BarrierCoordinator( +timeoutInSecs: Long, +listenerBus: LiveListenerBus, +override val rpcEnv: RpcEnv) extends ThreadSafeRpcEndpoint with Logging { + + private lazy val timer = new Timer("BarrierCoordinator barrier epoch increment timer") --- End diff -- Will we identify the underlying reason before merging to master? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21986 **[Test build #94272 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94272/testReport)** for PR 21986 at commit [`9bbaa3b`](https://github.com/apache/spark/commit/9bbaa3b18493fe5e77652b7f39bdc5a6732771bb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21986 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1838/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207813180 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -210,3 +221,66 @@ case class ArrayTransform( override def prettyName: String = "transform" } + +/** + * Filters entries in a map using the provided function. + */ +@ExpressionDescription( +usage = "_FUNC_(expr, func) - Filters entries in a map using the function.", +examples = """ +Examples: + > SELECT _FUNC_(map(1, 0, 2, 2, 3, -1), (k, v) -> k > v); + [1 -> 0, 3 -> -1] + """, +since = "2.4.0") +case class MapFilter( +input: Expression, +function: Expression) + extends MapBasedUnaryHigherOrderFunction with CodegenFallback { + + @transient val (keyType, valueType, valueContainsNull) = input.dataType match { +case MapType(kType, vType, vContainsNull) => (kType, vType, vContainsNull) +case _ => + val MapType(kType, vType, vContainsNull) = MapType.defaultConcreteType + (kType, vType, vContainsNull) + } + + @transient lazy val (keyVar, valueVar) = { +val args = function.asInstanceOf[LambdaFunction].arguments +(args.head.asInstanceOf[NamedLambdaVariable], args.tail.head.asInstanceOf[NamedLambdaVariable]) + } + + override def bind(f: (Expression, Seq[(DataType, Boolean)]) => LambdaFunction): MapFilter = { +function match { + case LambdaFunction(_, _, _) => +copy(function = f(function, (keyType, false) :: (valueType, valueContainsNull) :: Nil)) +} + } + + override def nullable: Boolean = input.nullable + + override def eval(input: InternalRow): Any = { +val m = this.input.eval(input).asInstanceOf[MapData] +if (m == null) { + null +} else { + val retKeys = new mutable.ListBuffer[Any] + val retValues = new mutable.ListBuffer[Any] --- End diff -- I think it is better as here we are always appending (and then creating an array from it). Appending a value is always O(1) for `ListBuffer`, while in `ArrayBuffer` it is: O(1) if the length of the underlying allocated array is bigger than the number of elements in the list plus one, O(n) otherwise (since it has to create a new array and copy the old one). As the initial value for the length of the underlying array in `ArrayBuffer` is 16, this means that for output values with more than 16 elements `ListBuffer` saves at least one copy. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21860: [SPARK-24901][SQL]Merge the codegen of RegularHas...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21860#discussion_r207809333 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/WholeStageCodegenSuite.scala --- @@ -232,6 +232,23 @@ class WholeStageCodegenSuite extends QueryTest with SharedSQLContext { } } + test("SPARK-24901 check merge FastHashMap and RegularHashMap generate code max size") { +val caseNumber = 80 +// merge fastHashMap and regularHashMap generate code max size +val codeWithLongFunctions = genGroupByCode(caseNumber) +val (_, maxCodeSize) = CodeGenerator.compile(codeWithLongFunctions) +assert(maxCodeSize < 13500) --- End diff -- What does `13500` means? The current max code size? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21970 **[Test build #94271 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94271/testReport)** for PR 21970 at commit [`9103b9c`](https://github.com/apache/spark/commit/9103b9cc488e032f1be452f9ee5964c7aa7b244c). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21970 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21970: [SPARK-24996][SQL] Use DSL in DeclarativeAggregate
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21970 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1837/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21986: [SPARK-23937][SQL] Add map_filter SQL function
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21986#discussion_r207808648 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala --- @@ -123,7 +125,10 @@ trait HigherOrderFunction extends Expression { } } -trait ArrayBasedHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes { +/** + * Trait for functions having as input one argument and one function. + */ +trait UnaryHigherOrderFunction extends HigherOrderFunction with ExpectsInputTypes { --- End diff -- I called it `Unary` as it gets one input and one function. Honestly I can't think of a better name without becoming very verbose. if you have a better suggestion I am happy to follow it. I will add the `nullSafeEval`, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/21931 Does this work when we set `30` into the parameter? I am afraid that several arrays with size `0x7fff` are allocated. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21860 **[Test build #94270 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94270/testReport)** for PR 21860 at commit [`dfa549e`](https://github.com/apache/spark/commit/dfa549e4ed782ddfff6df8fdb861d04a7465b0ba). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21403 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1836/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21919: [SPARK-24933][SS] Report numOutputRows in SinkProgress
Github user vackosar commented on the issue: https://github.com/apache/spark/pull/21919 @jose-torres @zsxwing I will exclude SinkProgress constructor from binary compatibility check as this object is constructed internally by Spark. That will remove current MiMa test failure. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21860: [SPARK-24901][SQL]Merge the codegen of RegularHashMap an...
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21860 ok to test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21403: [SPARK-24341][SQL] Support only IN subqueries with the s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21403 **[Test build #94269 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94269/testReport)** for PR 21403 at commit [`eb1dfb7`](https://github.com/apache/spark/commit/eb1dfb7e0873b8479ea54d223b7fde3dcefa4834). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21403: [SPARK-24341][SQL] Support only IN subqueries wit...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/21403#discussion_r207805905 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/expressions.scala --- @@ -505,6 +505,7 @@ object NullPropagation extends Rule[LogicalPlan] { // If the value expression is NULL then transform the In expression to null literal. case In(Literal(null, _), _) => Literal.create(null, BooleanType) + case InSubquery(Seq(Literal(null, _)), _) => Literal.create(null, BooleanType) --- End diff -- Thanks for your comment. I checked it again and I am pretty sure no regression is introduced. We don't have many optimizer rules using In and all the others were and are applied only to In with a list of literals. I am adding this and the other tests. Thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggrega...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21931#discussion_r207803961 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1437,6 +1437,15 @@ object SQLConf { .intConf .createWithDefault(20) + val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT = +buildConf("spark.sql.fast.hash.aggregate.row.max.capacity.bit") + .internal() + .doc("Capacity for the max number of rows to be held in memory by the fast hash aggregate " + +"product operator (e.g: configuration 16 capacity size is 65536).") --- End diff -- Can we describe something about this value? (e.g. `bit` not for `value`). Can we describe about the default value? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggrega...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/21931#discussion_r207803415 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1437,6 +1437,15 @@ object SQLConf { .intConf .createWithDefault(20) + val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT = +buildConf("spark.sql.fast.hash.aggregate.row.max.capacity.bit") --- End diff -- This name looks too long. How about `spark.sql.codegen.aggregate.map.row.capacitybit` or others? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggregate.row....
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21931 cc: @cloud-fan @hvanhovell --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21895 **[Test build #94268 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/94268/testReport)** for PR 21895 at commit [`14ae790`](https://github.com/apache/spark/commit/14ae790350b88c9ed64d63bd67cf21a22f8ffdd9). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21931: [SPARK-24978][SQL]Add spark.sql.fast.hash.aggrega...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21931#discussion_r207802603 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala --- @@ -1437,6 +1437,15 @@ object SQLConf { .intConf .createWithDefault(20) + val FAST_HASH_AGGREGATE_MAX_ROWS_CAPACITY_BIT = +buildConf("spark.sql.fast.hash.aggregate.row.max.capacity.bit") + .internal() + .doc("Capacity for the max number of rows to be held in memory by the fast hash aggregate " + +"product operator (e.g: configuration 16 capacity size is 65536).") + .intConf + .checkValue(bit => bit >= 1 && bit <= 30, "The bit value must be in [1, 30].") --- End diff -- We need to accept these small values, e.g., 2^1, 2^2, ..? I think these are meaningless... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21917: [SPARK-24720][STREAMING-KAFKA] add option to alig...
Github user QuentinAmbard commented on a diff in the pull request: https://github.com/apache/spark/pull/21917#discussion_r207802444 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala --- @@ -223,17 +240,46 @@ private[spark] class DirectKafkaInputDStream[K, V]( }.getOrElse(offsets) } - override def compute(validTime: Time): Option[KafkaRDD[K, V]] = { -val untilOffsets = clamp(latestOffsets()) -val offsetRanges = untilOffsets.map { case (tp, uo) => - val fo = currentOffsets(tp) - OffsetRange(tp.topic, tp.partition, fo, uo) + /** + * Return the offset range. For non consecutive offset the last offset must have record. + * If offsets have missing data (transaction marker or abort), increases the + * range until we get the requested number of record or no more records. + * Because we have to iterate over all the records in this case, + * we also return the total number of records. + * @param offsets the target range we would like if offset were continue + * @return (totalNumberOfRecords, updated offset) + */ + private def alignRanges(offsets: Map[TopicPartition, Long]): Iterable[OffsetRange] = { +if (nonConsecutive) { + val localRw = rewinder() + val localOffsets = currentOffsets + context.sparkContext.parallelize(offsets.toList).mapPartitions(tpos => { --- End diff -- Are you suggesting I should create a new kafkaRDD instead, and consume from this RDD to get the last offset range? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21895 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution-unified/1835/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21895 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21917: [SPARK-24720][STREAMING-KAFKA] add option to align range...
Github user QuentinAmbard commented on the issue: https://github.com/apache/spark/pull/21917 > By failed, you mean returned an empty collection after timing out, even though records should be available? You don't. You also don't know that it isn't just lost because kafka skipped a message. AFAIK from the information you have from a kafka consumer, once you start allowing gaps in offsets, you don't know. Ok that's interesting, my understanding was that if you successfully poll and get results you are 100% sure that you don't lose anything. Do you have more details on that? Why would kafka skip a record while consuming? > Have you tested comparing the results of consumer.endOffsets for consumers with different isolation levels? endOffsets returns the last offset (same as seekToEnd). But you're right that the easiest solution for us would be to have something like seekToLastRecord method instead. Maybe something we could also ask ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21948 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21895: [SPARK-24948][SHS] Delegate check access permissions to ...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/21895 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21948: [SPARK-24991][SQL] use InternalRow in DataSourceW...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/21948#discussion_r207801224 --- Diff: sql/core/src/main/java/org/apache/spark/sql/sources/v2/writer/DataWriterFactory.java --- @@ -33,7 +33,10 @@ public interface DataWriterFactory extends Serializable { /** - * Returns a data writer to do the actual writing work. + * Returns a data writer to do the actual writing work. Note that, Spark will reuse the same data + * object instance when sending data to the data writer, for better performance. Data writers + * are responsible for defensive copies if necessary, e.g. copy the data before buffer it in a + * list. --- End diff -- I'll fix it in my next PR, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org