[GitHub] spark pull request #21369: [SPARK-22713][CORE] ExternalAppendOnlyMap leaks w...
Github user eyalfa commented on a diff in the pull request: https://github.com/apache/spark/pull/21369#discussion_r191655937 --- Diff: core/src/test/scala/org/apache/spark/util/collection/ExternalAppendOnlyMapSuite.scala --- @@ -414,7 +415,106 @@ class ExternalAppendOnlyMapSuite extends SparkFunSuite with LocalSparkContext { sc.stop() } - test("external aggregation updates peak execution memory") { + test("SPARK-22713 spill during iteration leaks internal map") { +val size = 1000 +val conf = createSparkConf(loadDefaults = true) +sc = new SparkContext("local-cluster[1,1,1024]", "test", conf) +val map = createExternalMap[Int] --- End diff -- @cloud-fan, how do you suggest to progress with this? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21437 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3694/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21437 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21448: [SPARK-24408][SQL][DOC] Move abs, bitwiseNOT, isnan, nan...
Github user jaceklaskowski commented on the issue: https://github.com/apache/spark/pull/21448 It is such a small change that I don't think it's going to take long to get merged. Reaching out to friendly folks to reach a consensus on it :) /cc @srowen @holdenk --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21437 **[Test build #91288 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91288/testReport)** for PR 21437 at commit [`6fef2f1`](https://github.com/apache/spark/commit/6fef2f1808c559dfcf73a7678e655ba4a9aff62a). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21434: [SPARK-24331][SparkR][SQL] Adding arrays_overlap,...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21434 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21434: [SPARK-24331][SparkR][SQL] Adding arrays_overlap, array_...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21434 merged to master, thanks! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21366: [SPARK-24248][K8S] Use the Kubernetes API to popu...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21366#discussion_r191653358 --- Diff: pom.xml --- @@ -760,6 +760,12 @@ 1.10.19 test + --- End diff -- right - not saying we should completely reinvent the wheel of course. Does including this in the root pom bring the rxjava jar into the spark-core's dependency chain? If so, we should avoid it. license - you are right, for Apache 2.0 we don't need to list. but `jmock` yes the way you have. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assi...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21427 Please let users decide whether they are resolved by names or by position. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21288#discussion_r191650442 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala --- @@ -131,211 +132,214 @@ object FilterPushdownBenchmark { } /* +OpenJDK 64-Bit Server VM 1.8.0_171-b10 on Linux 4.14.26-46.32.amzn1.x86_64 Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz Select 0 string row (value IS NULL): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized8452 / 8504 1.9 537.3 1.0X -Parquet Vectorized (Pushdown) 274 / 281 57.3 17.4 30.8X -Native ORC Vectorized 8167 / 8185 1.9 519.3 1.0X -Native ORC Vectorized (Pushdown) 365 / 379 43.1 23.2 23.1X +Parquet Vectorized2961 / 3123 5.3 188.3 1.0X +Parquet Vectorized (Pushdown) 3057 / 3121 5.1 194.4 1.0X --- End diff -- Is it a regression? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21288#discussion_r191650406 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala --- @@ -131,211 +132,214 @@ object FilterPushdownBenchmark { } /* +OpenJDK 64-Bit Server VM 1.8.0_171-b10 on Linux 4.14.26-46.32.amzn1.x86_64 Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz Select 0 string row (value IS NULL): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized8452 / 8504 1.9 537.3 1.0X -Parquet Vectorized (Pushdown) 274 / 281 57.3 17.4 30.8X -Native ORC Vectorized 8167 / 8185 1.9 519.3 1.0X -Native ORC Vectorized (Pushdown) 365 / 379 43.1 23.2 23.1X +Parquet Vectorized2961 / 3123 5.3 188.3 1.0X +Parquet Vectorized (Pushdown) 3057 / 3121 5.1 194.4 1.0X --- End diff -- How about 2.3? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3693/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user dbtsai commented on the issue: https://github.com/apache/spark/pull/21459 @felixcheung Yes. All tests are passed with JDK8. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21452 **[Test build #91286 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91286/testReport)** for PR 21452 at commit [`9881d9c`](https://github.com/apache/spark/commit/9881d9c6a2b1d56e69bb06ee27fd8706f6e0fe43). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21458 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3692/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21246 **[Test build #91287 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91287/testReport)** for PR 21246 at commit [`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21458 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21246 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21452: [MINOR][CORE] Log committer class used by HadoopMapRedCo...
Github user felixcheung commented on the issue: https://github.com/apache/spark/pull/21452 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21459 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3691/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21459 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21378: [SPARK-24326][Mesos] add support for local:// sch...
Github user felixcheung commented on a diff in the pull request: https://github.com/apache/spark/pull/21378#discussion_r191647592 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -418,17 +417,33 @@ private[spark] class MesosClusterScheduler( envBuilder.build() } + private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = { +val isLocalJar = desc.jarUrl.startsWith("local://") +val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists { + case "container" => true + case "host" => false + case other => +logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.") +false + } --- End diff -- It's not very commonly used, but ok to me. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21459 **[Test build #91285 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91285/testReport)** for PR 21459 at commit [`bec3e81`](https://github.com/apache/spark/commit/bec3e81a3522b54692150584c86d1925799c08da). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21459: [SPARK-24420][Build] Upgrade ASM to 6.1 to suppor...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21459 [SPARK-24420][Build] Upgrade ASM to 6.1 to support JDK9+ ## What changes were proposed in this pull request? Upgrade ASM to 6.1 to support JDK9+ ## How was this patch tested? Existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark asm Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21459.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21459 commit bec3e81a3522b54692150584c86d1925799c08da Author: DB Tsai Date: 2018-05-29T22:35:55Z Asm6.1 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21458 **[Test build #91284 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91284/testReport)** for PR 21458 at commit [`48c57e0`](https://github.com/apache/spark/commit/48c57e09dda63259230aa81facb31c1795f602fe). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21458: [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with...
GitHub user dbtsai opened a pull request: https://github.com/apache/spark/pull/21458 [SPARK-24419] [Build] Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+ ## What changes were proposed in this pull request? Upgrade SBT to 0.13.17 with Scala 2.10.7 for JDK9+ ## How was this patch tested? Existing tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/dbtsai/spark sbt Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/21458.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #21458 commit 48c57e09dda63259230aa81facb31c1795f602fe Author: DB Tsai Date: 2018-05-29T23:54:22Z upgrade sbt --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21379: [SPARK-24327][SQL] Add an option to quote a partition co...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/21379 You mean we need verification code there for checking if the fetched schema has a user-given partition column? If the schema does not have the column, throws `AnalysisException` or something? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21379: [SPARK-24327][SQL] Add an option to quote a partition co...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/21379 Sorry, I just realized this is a wrong direction. Instead of trusting the user inputs, we should verify the user-specified partition columns by using the already fetched table schema info `val tableSchema = JDBCRDD.resolveTable(jdbcOptions)` when building `JDBCRelation` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21379: [SPARK-24327][SQL] Add an option to quote a parti...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/21379#discussion_r191641304 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCRelation.scala --- @@ -78,7 +79,12 @@ private[sql] object JDBCRelation extends Logging { // Overflow and silliness can happen if you subtract then divide. // Here we get a little roundoff, but that's (hopefully) OK. val stride: Long = upperBound / numPartitions - lowerBound / numPartitions -val column = partitioning.column +val column = if (jdbcOptions.quotePartitionColumnName) { + val dialect = JdbcDialects.get(jdbcOptions.url) + dialect.quoteIdentifier(partitioning.column) --- End diff -- If possible, please do not break the existing behavior. Both should work without specifying the extra option: ``` val df = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", "TEST.PEOPLE") .option("partitionColumn", "nonQuotedPrtColName") .option("lowerBound", 1) .option("upperBound", 4) .option("numPartitions", 3) .load() ``` ``` val df = spark.read.format("jdbc") .option("url", urlWithUserAndPass) .option("dbtable", "TEST.PEOPLE") .option("partitionColumn", nonQuotedPrtColName) .option("lowerBound", 1) .option("upperBound", 4) .option("numPartitions", 3) .load() ``` Is that possible? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21366 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91281/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21366: [SPARK-24248][K8S] Use the Kubernetes API to populate an...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21366 **[Test build #91281 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91281/testReport)** for PR 21366 at commit [`5b9c00f`](https://github.com/apache/spark/commit/5b9c00fa39d1c83435ca65de5394345e5d6f1f00). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91283/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21246 **[Test build #91283 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91283/testReport)** for PR 21246 at commit [`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21454 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21454 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91279/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21454: [SPARK-24337][Core] Improve error messages for Spark con...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21454 **[Test build #91279 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91279/testReport)** for PR 21454 at commit [`badbf0e`](https://github.com/apache/spark/commit/badbf0e6766a99565e061063041f231d119d6d3a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21378: [SPARK-24326][Mesos] add support for local:// sch...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21378#discussion_r191630562 --- Diff: resource-managers/mesos/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala --- @@ -418,17 +417,33 @@ private[spark] class MesosClusterScheduler( envBuilder.build() } + private def isContainerLocalAppJar(desc: MesosDriverDescription): Boolean = { +val isLocalJar = desc.jarUrl.startsWith("local://") +val isContainerLocal = desc.conf.getOption("spark.mesos.appJar.local.resolution.mode").exists { + case "container" => true + case "host" => false + case other => +logWarning(s"Unknown spark.mesos.appJar.local.resolution.mode $other, using host.") +false + } --- End diff -- Nothing wrong but I just find it hard to read. I assume @felixcheung had a similar concern at the core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21426: [SPARK-24384][PYTHON][SPARK SUBMIT] Add .py files correc...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21426 @vanzin, for https://github.com/apache/spark/pull/21426#discussion_r191010819, mind if we proceed in a separate ticket? From my look, it needs some changes to verify this to address this comment. I think we can't simply raise an exception since we can't recognise if that file is downloaded or not in `deploy.PythonRunner`'s perspective. The most appropriate place seems to be in `SparkSubmit` and `DependencyUtils.downloadFile`. seems we should inject some codes in `DependencyUtils.downloadFile` since that's where we know the original path and where we download the file into local when needed, and I would like to avoid add such changes here. It probably needs another review iteration and the current change doesn't actually target or change the previous behaviour, really. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191629554 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest { super.afterEach() } - test("receiver stopped with row last") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) -val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint -send( - endpoint, - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)) -) + private implicit def unsafeRow(value: Int) = { --- End diff -- And where it leverages the `implicit` attribute of this method? I'm not sure it is really needed, but I'm review on Github page so I might be missing here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191605388 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -58,39 +46,29 @@ class ContinuousShuffleReadSuite extends StreamTest { super.afterEach() } - test("receiver stopped with row last") { -val rdd = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) -val endpoint = rdd.partitions(0).asInstanceOf[ContinuousShuffleReadPartition].endpoint -send( - endpoint, - ReceiverEpochMarker(0), - ReceiverRow(0, unsafeRow(111)) -) + private implicit def unsafeRow(value: Int) = { --- End diff -- Just curious: is there a reason to rearrange functions, this and below twos? Looks like they're same except changing this function to `implicit`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191629272 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest { val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet assert(thirdEpoch == Set("writer1-row1", "writer2-row0")) } + + test("one epoch") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) + } + + test("multiple epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) +writer.write(Iterator(4, 5, 6)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) +assert(readEpoch(reader) == Seq(4, 5, 6)) + } + + test("empty epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator()) +writer.write(Iterator(1, 2)) +writer.write(Iterator()) +writer.write(Iterator()) +writer.write(Iterator(3, 4)) +writer.write(Iterator()) + +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(1, 2)) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(3, 4)) +assert(readEpoch(reader) == Seq()) + } + + test("blocks waiting for writer") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +val readerEpoch = reader.compute(reader.partitions(0), ctx) + +val readRowThread = new Thread { + override def run(): Unit = { +assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) + } +} +readRowThread.start() + +eventually(timeout(streamingTimeout)) { + assert(readRowThread.getState == Thread.State.TIMED_WAITING) +} + +// Once we write the epoch the thread should stop waiting and succeed. +writer.write(Iterator(1)) +readRowThread.join() + } + + test("multiple writer partitions") { --- End diff -- Would we want to have another test which covers out-of-order epoch between writers (if that's valid case for us), or rely on the test in ContinuousShuffleReadRDD? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20697: [SPARK-23010][k8s] Initial checkin of k8s integra...
Github user ssuchter commented on a diff in the pull request: https://github.com/apache/spark/pull/20697#discussion_r191629581 --- Diff: resource-managers/kubernetes/integration-tests/scripts/setup-integration-test-env.sh --- @@ -0,0 +1,91 @@ +#!/usr/bin/env bash + +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +#http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +TEST_ROOT_DIR=$(git rev-parse --show-toplevel) +UNPACKED_SPARK_TGZ="$TEST_ROOT_DIR/target/spark-dist-unpacked" +IMAGE_TAG_OUTPUT_FILE="$TEST_ROOT_DIR/target/image-tag.txt" +DEPLOY_MODE="minikube" +IMAGE_REPO="docker.io/kubespark" +IMAGE_TAG="N/A" +SPARK_TGZ="N/A" + +# Parse arguments +while (( "$#" )); do + case $1 in +--unpacked-spark-tgz) + UNPACKED_SPARK_TGZ="$2" + shift + ;; +--image-repo) + IMAGE_REPO="$2" + shift + ;; +--image-tag) + IMAGE_TAG="$2" + shift + ;; +--image-tag-output-file) + IMAGE_TAG_OUTPUT_FILE="$2" + shift + ;; +--deploy-mode) + DEPLOY_MODE="$2" + shift + ;; +--spark-tgz) + SPARK_TGZ="$2" + shift + ;; +*) + break + ;; + esac + shift +done + +if [[ $SPARK_TGZ == "N/A" ]]; +then + echo "Must specify a Spark tarball to build Docker images against with --spark-tgz." && exit 1; --- End diff -- I'm sure it's possible. But I'd rather not try to do that refactor. I'd be really happy if you wanted to. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21451: [SPARK-24296][CORE][WIP] Replicate large blocks a...
Github user witgo commented on a diff in the pull request: https://github.com/apache/spark/pull/21451#discussion_r191628993 --- Diff: common/network-common/src/main/java/org/apache/spark/network/server/RpcHandler.java --- @@ -38,15 +38,24 @@ * * This method will not be called in parallel for a single TransportClient (i.e., channel). * + * The rpc *might* included a data stream in streamData (eg. for uploading a large + * amount of data which should not be buffered in memory here). Any errors while handling the + * streamData will lead to failing this entire connection -- all other in-flight rpcs will fail. + * If stream data is not null, you *must* call streamData.registerStreamCallback + * before this method returns. + * * @param client A channel client which enables the handler to make requests back to the sender * of this RPC. This will always be the exact same object for a particular channel. * @param message The serialized bytes of the RPC. + * @param streamData StreamData if there is data which is meant to be read via a StreamCallback; + * otherwise it is null. * @param callback Callback which should be invoked exactly once upon success or failure of the * RPC. */ public abstract void receive( TransportClient client, ByteBuffer message, + StreamData streamData, --- End diff -- It's not necessary to add a parameter. Change the message parameter to InputStream. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21428 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21428 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91278/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21428: [SPARK-24235][SS] Implement continuous shuffle writer fo...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21428 **[Test build #91278 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91278/testReport)** for PR 21428 at commit [`65837ac`](https://github.com/apache/spark/commit/65837ac611991f2db7710d0657e56b222a2f5c74). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21437 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21437 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91277/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21437 **[Test build #91277 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91277/testReport)** for PR 21437 at commit [`2ea9cbc`](https://github.com/apache/spark/commit/2ea9cbc80787f1417fa4502c3c2b9b89f46d0632). * This patch **fails PySpark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191627958 --- Diff: python/pyspark/util.py --- @@ -55,7 +55,9 @@ def _get_argspec(f): """ # `getargspec` is deprecated since python3.0 (incompatible with function annotations). # See SPARK-23569. -if sys.version_info[0] < 3: +if hasattr(f, '_argspec'): --- End diff -- Sounds good --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21454#discussion_r191626037 --- Diff: core/src/test/scala/org/apache/spark/SparkConfSuite.scala --- @@ -25,14 +25,16 @@ import scala.language.postfixOps import scala.util.{Random, Try} import com.esotericsoftware.kryo.Kryo +import org.scalatest.Matchers import org.apache.spark.deploy.history.config._ import org.apache.spark.internal.config._ import org.apache.spark.network.util.ByteUnit import org.apache.spark.serializer.{JavaSerializer, KryoRegistrator, KryoSerializer} import org.apache.spark.util.{ResetSystemProperties, RpcUtils} -class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties { +class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSystemProperties with + Matchers { --- End diff -- I would do `with Matchers` here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21454#discussion_r191625768 --- Diff: core/src/test/scala/org/apache/spark/SparkConfSuite.scala --- @@ -339,6 +341,38 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } } + val defaultIllegalValue = "SomeIllegalValue" + val illegalValueTests : Map[String, (SparkConf, String) => Any] = Map( +"getTimeAsSeconds" -> (_.getTimeAsSeconds(_)), +"getTimeAsSeconds with default" -> (_.getTimeAsSeconds(_, defaultIllegalValue)), +"getTimeAsMs" -> (_.getTimeAsMs(_)), +"getTimeAsMs with default" -> (_.getTimeAsMs(_, defaultIllegalValue)), +"getSizeAsBytes" -> (_.getSizeAsBytes(_)), +"getSizeAsBytes with default string" -> (_.getSizeAsBytes(_, defaultIllegalValue)), +"getSizeAsBytes with default long" -> (_.getSizeAsBytes(_, 0L)), +"getSizeAsKb" -> (_.getSizeAsKb(_)), +"getSizeAsKb with default" -> (_.getSizeAsKb(_, defaultIllegalValue)), +"getSizeAsMb" -> (_.getSizeAsMb(_)), +"getSizeAsMb with default" -> (_.getSizeAsMb(_, defaultIllegalValue)), +"getSizeAsGb" -> (_.getSizeAsGb(_)), +"getSizeAsGb with default" -> (_.getSizeAsGb(_, defaultIllegalValue)), +"getInt" -> (_.getInt(_, 0)), +"getLong" -> (_.getLong(_, 0L)), +"getDouble" -> (_.getDouble(_, 0.0)), +"getBoolean" -> (_.getBoolean(_, false)) + ) + + illegalValueTests.foreach { case (name, getValue) => +test(s"SPARK-24337: $name throws an useful error message with key name") { + val key = "SomeKey" + val conf = new SparkConf() + conf.set(key, "SomeInvalidValue") + val thrown = the [IllegalArgumentException] thrownBy { +getValue(conf, key) + } + thrown.getMessage should include (key) --- End diff -- Shall we stick to `assert` syntax? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21454#discussion_r191625705 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -448,6 +473,20 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria */ private[spark] def getenv(name: String): String = System.getenv(name) + /** + * Wrapper method for get*() methods which require some specific value format. This catches --- End diff -- `get*()` .. ? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21454: [SPARK-24337][Core] Improve error messages for Sp...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21454#discussion_r191625505 --- Diff: core/src/main/scala/org/apache/spark/SparkConf.scala --- @@ -265,108 +265,121 @@ class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging with Seria * Get a time parameter as seconds; throws a NoSuchElementException if it's not set. If no * suffix is provided then seconds are assumed. * @throws java.util.NoSuchElementException If the time parameter is not set + * @throws IllegalArgumentException If the value can't be interpreted as seconds --- End diff -- I usually avoid abbreviation in the documentation though. `can't` -> `cannot`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21440: [SPARK-24307][CORE] Support reading remote cached...
Github user Ngone51 commented on a diff in the pull request: https://github.com/apache/spark/pull/21440#discussion_r191623277 --- Diff: core/src/test/scala/org/apache/spark/io/ChunkedByteBufferFileRegionSuite.scala --- @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.io + +import java.nio.ByteBuffer +import java.nio.channels.WritableByteChannel + +import scala.util.Random + +import org.mockito.Mockito.when +import org.scalatest.BeforeAndAfterEach +import org.scalatest.mockito.MockitoSugar + +import org.apache.spark.{SparkConf, SparkEnv, SparkFunSuite} +import org.apache.spark.internal.config +import org.apache.spark.util.io.ChunkedByteBuffer + +class ChunkedByteBufferFileRegionSuite extends SparkFunSuite with MockitoSugar +with BeforeAndAfterEach { + + override protected def beforeEach(): Unit = { +super.beforeEach() +val conf = new SparkConf() +val env = mock[SparkEnv] +SparkEnv.set(env) +when(env.conf).thenReturn(conf) + } + + override protected def afterEach(): Unit = { +SparkEnv.set(null) + } + + private def generateChunkByteBuffer(nChunks: Int, perChunk: Int): ChunkedByteBuffer = { +val bytes = (0 until nChunks).map { chunkIdx => + val bb = ByteBuffer.allocate(perChunk) + (0 until perChunk).foreach { idx => +bb.put((chunkIdx * perChunk + idx).toByte) + } + bb.position(0) + bb +}.toArray +new ChunkedByteBuffer(bytes) + } + + test("transferTo can stop and resume correctly") { +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, 9L) +val cbb = generateChunkByteBuffer(4, 10) +val fileRegion = cbb.toNetty + +val targetChannel = new LimitedWritableByteChannel(40) + +var pos = 0L +// write the fileregion to the channel, but with the transfer limited at various spots along +// the way. + +// limit to within the first chunk +targetChannel.acceptNBytes = 5 +pos = fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 5) + +// a little bit further within the first chunk +targetChannel.acceptNBytes = 2 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 7) + +// past the first chunk, into the 2nd +targetChannel.acceptNBytes = 6 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 13) + +// right to the end of the 2nd chunk +targetChannel.acceptNBytes = 7 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 20) + +// rest of 2nd chunk, all of 3rd, some of 4th +targetChannel.acceptNBytes = 15 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 35) + +// now till the end +targetChannel.acceptNBytes = 5 +pos += fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + +// calling again at the end should be OK +targetChannel.acceptNBytes = 20 +fileRegion.transferTo(targetChannel, pos) +assert(targetChannel.pos === 40) + } + + test(s"transfer to with random limits") { +val rng = new Random() +val seed = System.currentTimeMillis() +logInfo(s"seed = $seed") +rng.setSeed(seed) +val chunkSize = 1e4.toInt +SparkEnv.get.conf.set(config.BUFFER_WRITE_CHUNK_SIZE, rng.nextInt(chunkSize).toLong) + +val cbb = generateChunkByteBuffer(50, chunkSize) +val fileRegion = cbb.toNetty +val transferLimit = 1e5.toInt +val targetChannel = new LimitedWritableByteChannel(transferLimit) +while (targetChannel.pos < cbb.size) { + val nextTransferSize = rng.nextInt(transferLimit) + targetChannel.acceptNBytes = nextTransferSize + file
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191624414 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- No, it seems not. Shall we change it to `get(key)`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21457 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21457 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91276/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21457: [SPARK-24414][ui] Calculate the correct number of tasks ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21457 **[Test build #91276 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91276/testReport)** for PR 21457 at commit [`40b6cb7`](https://github.com/apache/spark/commit/40b6cb7117598560d91bf6efb148c482eadd8daf). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191623502 --- Diff: python/pyspark/util.py --- @@ -55,7 +55,9 @@ def _get_argspec(f): """ # `getargspec` is deprecated since python3.0 (incompatible with function annotations). # See SPARK-23569. -if sys.version_info[0] < 3: +if hasattr(f, '_argspec'): --- End diff -- Yea, the current way sounds a hack .. let's document this although we have a plan to clean this up soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191622793 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- Wait .. it's a dictionary. Does a dictionary's `get` has `default` keyword argument? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191622279 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- yeah. I added the default=None to make it super obvious. i see no harm in making the code more intuitive. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20901: [SPARK-23792][DOCS] Documentation improvements for datet...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20901 Can one of the admins verify this patch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocalPropert...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21437 Seems fine and I'm okay with it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191621339 --- Diff: python/pyspark/util.py --- @@ -55,7 +55,9 @@ def _get_argspec(f): """ # `getargspec` is deprecated since python3.0 (incompatible with function annotations). # See SPARK-23569. -if sys.version_info[0] < 3: +if hasattr(f, '_argspec'): --- End diff -- Can you add a comment explaining this? Just from this function it's not clear to me why do we need `_argspec` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user icexelloss commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191621391 --- Diff: python/pyspark/sql/udf.py --- @@ -157,7 +157,17 @@ def _create_judf(self): spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext -wrapped_func = _wrap_function(sc, self.func, self.returnType) +func = fail_on_stopiteration(self.func) + +# for pandas UDFs the worker needs to know if the function takes +# one or two arguments, but the signature is lost when wrapping with +# fail_on_stopiteration, so we store it here +if self.evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF): +func._argspec = _get_argspec(self.func) --- End diff -- I see. Thanks for the clarification. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191621228 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- get(key) would return `None` tho if it's missing. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Data Source write benchmar...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/21409 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21409: [SPARK-24365][SQL] Add Data Source write benchmar...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21409#discussion_r191621006 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/DataSourceWriteBenchmark.scala --- @@ -0,0 +1,145 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.benchmark + +import org.apache.spark.SparkConf +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.util.Benchmark + +/** + * Benchmark to measure data source write performance. + * By default it measures 4 data source format: Parquet, ORC, JSON, CSV: + * spark-submit --class + * To measure specified formats, run it with arguments: + * spark-submit --class format1 [format2] [...] + */ +object DataSourceWriteBenchmark { + val conf = new SparkConf() +.setAppName("DataSourceWriteBenchmark") +.setIfMissing("spark.master", "local[1]") +.set("spark.sql.parquet.compression.codec", "snappy") +.set("spark.sql.orc.compression.codec", "snappy") + + val spark = SparkSession.builder.config(conf).getOrCreate() + + // Set default configs. Individual cases will change them if necessary. + spark.conf.set(SQLConf.WHOLESTAGE_CODEGEN_ENABLED.key, "true") + + val tempTable = "temp" + val numRows = 1024 * 1024 * 15 + + def withTempTable(tableNames: String*)(f: => Unit): Unit = { +try f finally tableNames.foreach(spark.catalog.dropTempView) + } + + def withTable(tableNames: String*)(f: => Unit): Unit = { +try f finally { + tableNames.foreach { name => +spark.sql(s"DROP TABLE IF EXISTS $name") + } +} + } + + def writeInt(table: String, format: String, benchmark: Benchmark): Unit = { +spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") +benchmark.addCase("Output Single Int Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + +s"c1, cast(id as STRING) as c2 from $tempTable") +} + } + + def writeIntString(table: String, format: String, benchmark: Benchmark): Unit = { +spark.sql(s"create table $table(c1 INT, c2 STRING) using $format") +benchmark.addCase("Output Int and String Column") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + +s"c1, cast(id as STRING) as c2 from $tempTable") +} + } + + def writePartition(table: String, format: String, benchmark: Benchmark): Unit = { +spark.sql(s"create table $table(p INT, id INT) using $format PARTITIONED BY (p)") +benchmark.addCase("Output Partitions") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as id," + +s" cast(id % 2 as INT) as p from $tempTable") +} + } + + def writeBucket(table: String, format: String, benchmark: Benchmark): Unit = { +spark.sql(s"create table $table(c1 INT, c2 INT) using $format CLUSTERED BY (c2) INTO 2 BUCKETS") +benchmark.addCase("Output Buckets") { _ => + spark.sql(s"INSERT overwrite table $table select cast(id as INT) as " + +s"c1, cast(id as INT) as c2 from $tempTable") +} + } + + def main(args: Array[String]): Unit = { +val tableInt = "tableInt" +val tableIntString = "tableIntString" +val tablePartition = "tablePartition" +val tableBucket = "tableBucket" +// If the +val formats: Seq[String] = if (args.isEmpty) { + Seq("Parquet", "ORC", "JSON", "CSV") +} else { + args +} +/* +Intel(R) Core(TM) i7-6920HQ CPU @ 2.90GHz +Parquet writer benchmark:Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative + +Output Single Int Column
[GitHub] spark pull request #21437: [SPARK-24397][PYSPARK] Added TaskContext.getLocal...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21437#discussion_r191620990 --- Diff: python/pyspark/taskcontext.py --- @@ -88,3 +89,9 @@ def taskAttemptId(self): TaskAttemptID. """ return self._taskAttemptId + +def getLocalProperty(self, key): +""" +Get a local property set upstream in the driver, or None if it is missing. --- End diff -- Just get(key) returns `None` if missing tho .. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21409: [SPARK-24365][SQL] Add Data Source write benchmark
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/21409 thanks, merging to master! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21288: [SPARK-24206][SQL] Improve FilterPushdownBenchmar...
Github user maropu commented on a diff in the pull request: https://github.com/apache/spark/pull/21288#discussion_r191620766 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/execution/benchmark/FilterPushdownBenchmark.scala --- @@ -131,211 +132,214 @@ object FilterPushdownBenchmark { } /* +OpenJDK 64-Bit Server VM 1.8.0_171-b10 on Linux 4.14.26-46.32.amzn1.x86_64 Intel(R) Xeon(R) CPU E5-2686 v4 @ 2.30GHz Select 0 string row (value IS NULL): Best/Avg Time(ms) Rate(M/s) Per Row(ns) Relative -Parquet Vectorized8452 / 8504 1.9 537.3 1.0X -Parquet Vectorized (Pushdown) 274 / 281 57.3 17.4 30.8X -Native ORC Vectorized 8167 / 8185 1.9 519.3 1.0X -Native ORC Vectorized (Pushdown) 365 / 379 43.1 23.2 23.1X +Parquet Vectorized2961 / 3123 5.3 188.3 1.0X +Parquet Vectorized (Pushdown) 3057 / 3121 5.1 194.4 1.0X --- End diff -- That might be, but I feel the change was too big... I probably think that I had some mistakes in the last benchmark runs (I've not found why yet though). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21422: [Spark-24376][doc]Summary:compiling spark with scala-2.1...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21422 Thank you @gentlewangyu. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191619371 --- Diff: python/pyspark/util.py --- @@ -55,7 +55,9 @@ def _get_argspec(f): """ # `getargspec` is deprecated since python3.0 (incompatible with function annotations). # See SPARK-23569. -if sys.version_info[0] < 3: +if hasattr(f, '_argspec'): +argspec = f._argspec --- End diff -- @e-dorigatti, sorry last comment. Shall we add a short note that we will reach here when Pandas UDFs only for now? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/3690/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21246 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21383: [SPARK-23754][Python] Re-raising StopIteration in...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/21383#discussion_r191619171 --- Diff: python/pyspark/sql/udf.py --- @@ -157,7 +157,17 @@ def _create_judf(self): spark = SparkSession.builder.getOrCreate() sc = spark.sparkContext -wrapped_func = _wrap_function(sc, self.func, self.returnType) +func = fail_on_stopiteration(self.func) + +# for pandas UDFs the worker needs to know if the function takes +# one or two arguments, but the signature is lost when wrapping with +# fail_on_stopiteration, so we store it here +if self.evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, + PythonEvalType.SQL_GROUPED_AGG_PANDAS_UDF): +func._argspec = _get_argspec(self.func) --- End diff -- Yup, we definitely support. The current approach probably wouldn't change anything we supported before. I believe the builtin functions in Python 2 don't already with with Pandas UDFs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21246 **[Test build #91283 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91283/testReport)** for PR 21246 at commit [`6fd8f2f`](https://github.com/apache/spark/commit/6fd8f2fbd37e5193f0ffb1a25a8f4a8c71ab55bd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191618335 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, +outputPartitioner: Partitioner, +endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { + + if (outputPartitioner.numPartitions != 1) { --- End diff -- I believe so, but there's no way to test whether it will work until we implement the scheduling support for distributing the addresses of each of the multiple readers. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21246 LGTM pending Jenkins. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21246 I'd retrigger the build for just checking again. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191618218 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, +outputPartitioner: Partitioner, +endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { + + if (outputPartitioner.numPartitions != 1) { +throw new IllegalArgumentException("multiple readers not yet supported") + } + + if (outputPartitioner.numPartitions != endpoints.length) { +throw new IllegalArgumentException(s"partitioner size ${outputPartitioner.numPartitions} did " + + s"not match endpoint count ${endpoints.length}") + } + + def write(epoch: Iterator[UnsafeRow]): Unit = { +while (epoch.hasNext) { + val row = epoch.next() + endpoints(outputPartitioner.getPartition(row)).ask[Unit](ReceiverRow(writerId, row)) --- End diff -- cc @zsxwing It's my understanding that the RPC framework guarantees messages will be sent in the order that they're ask()ed, and that it's therefore not possible for a single row to fail to be sent while the ones before and after it succeed. If this is the case, then we don't need to handle it here - the query will just start failing to make progress. If it's not the case, we'll need a more clever solution. Maybe have the epoch marker message contain a count for the number of rows that are supposed to be in the epoch? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191618212 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, --- End diff -- ok makes sense. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21246: [SPARK-23901][SQL] Add masking functions
Github user ueshin commented on the issue: https://github.com/apache/spark/pull/21246 Jenkins, retest this please. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21422: [Spark-24376][doc]Summary:compiling spark with sc...
Github user gentlewangyu closed the pull request at: https://github.com/apache/spark/pull/21422 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21422: [Spark-24376][doc]Summary:compiling spark with scala-2.1...
Github user gentlewangyu commented on the issue: https://github.com/apache/spark/pull/21422 @HyukjinKwon @jerryshao ok , thanks --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191617881 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, --- End diff -- I worry that partitionId is ambiguous with the partition to which the shuffle data is being written. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191613866 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, --- End diff -- nit: rename to `partitionId`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191617448 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/shuffle/RPCContinuousShuffleWriter.scala --- @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.continuous.shuffle + +import org.apache.spark.Partitioner +import org.apache.spark.rpc.RpcEndpointRef +import org.apache.spark.sql.catalyst.expressions.UnsafeRow + +/** + * A [[ContinuousShuffleWriter]] sending data to [[RPCContinuousShuffleReader]] instances. + * + * @param writerId The partition ID of this writer. + * @param outputPartitioner The partitioner on the reader side of the shuffle. + * @param endpoints The [[RPCContinuousShuffleReader]] endpoints to write to. Indexed by + * partition ID within outputPartitioner. + */ +class RPCContinuousShuffleWriter( +writerId: Int, +outputPartitioner: Partitioner, +endpoints: Array[RpcEndpointRef]) extends ContinuousShuffleWriter { + + if (outputPartitioner.numPartitions != 1) { --- End diff -- any reason to disable it ? this should work rt? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21427: [SPARK-24324][PYTHON] Pandas Grouped Map UDF should assi...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/21427 For configuration, I wasn't sure if we should send the whole configuration map into worker.py side, if we should fix the command writing way, and also was thinking of the current timezone way is kind of one time thing. Was just wondering if we really should do that. I am okay if that's the only way and I should add a configuration for 2.4. Just for clarification, we should probably remove this configuration in 3.0.0 too. For the current approach, I thought we better check if there are other cases possibly broken and see if that makes sense rather then just blocking this only because there are a bit of behaviour changes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21428: [SPARK-24235][SS] Implement continuous shuffle wr...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/21428#discussion_r191615398 --- Diff: sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/shuffle/ContinuousShuffleSuite.scala --- @@ -288,4 +267,153 @@ class ContinuousShuffleReadSuite extends StreamTest { val thirdEpoch = rdd.compute(rdd.partitions(0), ctx).map(_.getUTF8String(0).toString).toSet assert(thirdEpoch == Set("writer1-row1", "writer2-row0")) } + + test("one epoch") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) + } + + test("multiple epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator(1, 2, 3)) +writer.write(Iterator(4, 5, 6)) + +assert(readEpoch(reader) == Seq(1, 2, 3)) +assert(readEpoch(reader) == Seq(4, 5, 6)) + } + + test("empty epochs") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +writer.write(Iterator()) +writer.write(Iterator(1, 2)) +writer.write(Iterator()) +writer.write(Iterator()) +writer.write(Iterator(3, 4)) +writer.write(Iterator()) + +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(1, 2)) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq()) +assert(readEpoch(reader) == Seq(3, 4)) +assert(readEpoch(reader) == Seq()) + } + + test("blocks waiting for writer") { +val reader = new ContinuousShuffleReadRDD(sparkContext, numPartitions = 1) +val writer = new RPCContinuousShuffleWriter( + 0, new HashPartitioner(1), Array(readRDDEndpoint(reader))) + +val readerEpoch = reader.compute(reader.partitions(0), ctx) + +val readRowThread = new Thread { + override def run(): Unit = { +assert(readerEpoch.toSeq.map(_.getInt(0)) == Seq(1)) + } +} +readRowThread.start() + +eventually(timeout(streamingTimeout)) { + assert(readRowThread.getState == Thread.State.TIMED_WAITING) +} + +// Once we write the epoch the thread should stop waiting and succeed. +writer.write(Iterator(1)) +readRowThread.join() + } + + test("multiple writer partitions") { +val numWriterPartitions = 3 + +val reader = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) +val writers = (0 until 3).map { idx => + new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) +} + +writers(0).write(Iterator(1, 4, 7)) +writers(1).write(Iterator(2, 5)) +writers(2).write(Iterator(3, 6)) + +writers(0).write(Iterator(4, 7, 10)) +writers(1).write(Iterator(5, 8)) +writers(2).write(Iterator(6, 9)) + +// Since there are multiple asynchronous writers, the original row sequencing is not guaranteed. +// The epochs should be deterministically preserved, however. +assert(readEpoch(reader).toSet == Seq(1, 2, 3, 4, 5, 6, 7).toSet) +assert(readEpoch(reader).toSet == Seq(4, 5, 6, 7, 8, 9, 10).toSet) + } + + test("reader epoch only ends when all writer partitions write it") { +val numWriterPartitions = 3 + +val reader = new ContinuousShuffleReadRDD( + sparkContext, numPartitions = 1, numShuffleWriters = numWriterPartitions) +val writers = (0 until 3).map { idx => + new RPCContinuousShuffleWriter(idx, new HashPartitioner(1), Array(readRDDEndpoint(reader))) +} + +writers(1).write(Iterator()) +writers(2).write(Iterator()) + +val readerEpoch = reader.compute(reader.partitions(0), ctx) + +val readEpochMarkerThread = new Thread { + override def run(): Unit = { +assert(!readerEpoch.hasNext) + } +} + +readEpochMarkerThread.start() +eventually(timeout(streamingTimeout)) { + assert(readEpochMarkerThread.getState == Thread.State.TIMED_WAITING) +} + +writers(0).write(Iterator()) +readEpochMarkerThread.join() + } + + test("receiver stopped with row last") { +
[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21413 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/91282/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21413 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/21413 **[Test build #91282 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/91282/testReport)** for PR 21413 at commit [`d8f3906`](https://github.com/apache/spark/commit/d8f3906be4d4178d3c41bff41eaeb39f430ade6b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191611678 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,165 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + def mountSchema: StructType = { +val fields = children.zip(arrayElementTypes).zipWithIndex.map { + case ((expr: NamedExpression, elementType), _) => +StructField(expr.name, elementType, nullable = true) + case ((_, elementType), idx) => +StructField(s"$idx", elementType, nullable = true) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; +|$biggestCardinality = Math.max($biggestCardinality, $arrCardinality[$index]); + """.stripMargin +} + +val inputsSplitted = ctx.splitExpressions( + expressions = inputs, + funcName = "getInputAndCardinality", + returnType = "int", + makeSplitFunction = body => +s""" + |$body + |return $biggestCardinality; +""".stripMargin, + foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"), + arguments = +("ArrayData[]", arrVals) :: +("int[]", arrCardinality) :: +("String[]", storedArrTypes) :: +("int", biggestCardinality) :: +("boolean[]", returnNull) :: Nil) + +val myobject = ctx.freshName("myobject") +val j = ctx.freshName("j") +val i = ctx.freshName("i") +val args = ctx.freshName("args") + +val cases = arrayElementTypes.distinct.map { elementType => + val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", elementType, i) + s""" +|case "${elementType}": +| $myobject[$j] = $getArrValsItem; +| break; + """.stripMargin +} + +ev.copy(s""" + |ArrayData[] $arrVals = new ArrayData[$numberOfArrays]; + |int[] $arrCardinality = new int[$numberOfArrays]; + |int $biggestCardinality = 0; + |String[] $storedArrTypes = new String[$numberOfArrays]; + |boolean[] $returnNull = new boolean[1]; + |$returnNull[0] = false; + |$inputsSplitted + |${CodeGenerator.javaType(dataType)} ${ev.value}; + |boolean ${ev.isNull} = $returnNull[0]; + |if (${ev.isNull}) { + | ${ev.value} = null; + |} else { + | if ($numberOfArrays == 0) { + |${ev.value} = new $genericArrayData(new Object[0]); --- End diff -- We can simplify the generated code when `numberOfArrays == 0` instead of generating the whole code. ---
[GitHub] spark pull request #21045: [SPARK-23931][SQL] Adds zip function to sparksql
Github user ueshin commented on a diff in the pull request: https://github.com/apache/spark/pull/21045#discussion_r191610871 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/collectionOperations.scala --- @@ -127,6 +127,165 @@ case class MapKeys(child: Expression) override def prettyName: String = "map_keys" } +@ExpressionDescription( + usage = """_FUNC_(a1, a2, ...) - Returns a merged array containing in the N-th position the + N-th value of each array given.""", + examples = """ +Examples: + > SELECT _FUNC_(array(1, 2, 3), array(2, 3, 4)); +[[1, 2], [2, 3], [3, 4]] + > SELECT _FUNC_(array(1, 2), array(2, 3), array(3, 4)); +[[1, 2, 3], [2, 3, 4]] + """, + since = "2.4.0") +case class Zip(children: Seq[Expression]) extends Expression with ExpectsInputTypes { + + override def inputTypes: Seq[AbstractDataType] = Seq.fill(children.length)(ArrayType) + + override def dataType: DataType = ArrayType(mountSchema) + + override def nullable: Boolean = children.forall(_.nullable) + + private lazy val arrayTypes = children.map(_.dataType.asInstanceOf[ArrayType]) + + private lazy val arrayElementTypes = arrayTypes.map(_.elementType) + + def mountSchema: StructType = { +val fields = children.zip(arrayElementTypes).zipWithIndex.map { + case ((expr: NamedExpression, elementType), _) => +StructField(expr.name, elementType, nullable = true) + case ((_, elementType), idx) => +StructField(s"$idx", elementType, nullable = true) +} +StructType(fields) + } + + override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { +val numberOfArrays: Int = children.length +val genericArrayData = classOf[GenericArrayData].getName +val genericInternalRow = classOf[GenericInternalRow].getName +val arrVals = ctx.freshName("arrVals") +val arrCardinality = ctx.freshName("arrCardinality") +val biggestCardinality = ctx.freshName("biggestCardinality") +val storedArrTypes = ctx.freshName("storedArrTypes") +val returnNull = ctx.freshName("returnNull") +val evals = children.map(_.genCode(ctx)) + +val inputs = evals.zipWithIndex.map { case (eval, index) => + s""" +|${eval.code} +|if (!${eval.isNull}) { +| $arrVals[$index] = ${eval.value}; +| $arrCardinality[$index] = ${eval.value}.numElements(); +|} else { +| $arrVals[$index] = null; +| $arrCardinality[$index] = 0; +| $returnNull[0] = true; +|} +|$storedArrTypes[$index] = "${arrayElementTypes(index)}"; +|$biggestCardinality = Math.max($biggestCardinality, $arrCardinality[$index]); + """.stripMargin +} + +val inputsSplitted = ctx.splitExpressions( + expressions = inputs, + funcName = "getInputAndCardinality", + returnType = "int", + makeSplitFunction = body => +s""" + |$body + |return $biggestCardinality; +""".stripMargin, + foldFunctions = _.map(funcCall => s"$biggestCardinality = $funcCall;").mkString("\n"), + arguments = +("ArrayData[]", arrVals) :: +("int[]", arrCardinality) :: +("String[]", storedArrTypes) :: +("int", biggestCardinality) :: +("boolean[]", returnNull) :: Nil) + +val myobject = ctx.freshName("myobject") +val j = ctx.freshName("j") +val i = ctx.freshName("i") +val args = ctx.freshName("args") + +val cases = arrayElementTypes.distinct.map { elementType => + val getArrValsItem = CodeGenerator.getValue(s"$arrVals[$j]", elementType, i) + s""" +|case "${elementType}": +| $myobject[$j] = $getArrValsItem; +| break; + """.stripMargin +} + +ev.copy(s""" + |ArrayData[] $arrVals = new ArrayData[$numberOfArrays]; + |int[] $arrCardinality = new int[$numberOfArrays]; + |int $biggestCardinality = 0; + |String[] $storedArrTypes = new String[$numberOfArrays]; + |boolean[] $returnNull = new boolean[1]; + |$returnNull[0] = false; + |$inputsSplitted + |${CodeGenerator.javaType(dataType)} ${ev.value}; + |boolean ${ev.isNull} = $returnNull[0]; + |if (${ev.isNull}) { + | ${ev.value} = null; + |} else { + | if ($numberOfArrays == 0) { + |${ev.value} = new $genericArrayData(new Object[0]); + | } else { + |Object[] $args = new Object[$biggestCardinality]; + |for (int $i = 0; $i < $biggestCardinality; $
[GitHub] spark issue #21413: [SPARK-23161][PYSPARK][ML]Add missing APIs to Python GBT...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/21413 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org