[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152912084 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Do you think it's necessary to indicate the actual parallelism's calculation way here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911936 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Oh, I see...nvm. I misread the `+ 1`... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911829 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Says `statuses.length.toLong * totalSizes.length` is `1001`, for example: ```scala scala> 1001 / 1001 res0: Int = 1 ``` Now, it is more than the threshold, but the parallel aggregation is not enabled... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19810 **[Test build #84152 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84152/testReport)** for PR 19810 at commit [`accd549`](https://github.com/apache/spark/commit/accd5493464d5bd5f7401285defbd8ca2628ae68). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152911325 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so it doesn't need to be 2 times, just not smaller than 1x is good. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19809 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84150/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19809 **[Test build #84150 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84150/testReport)** for PR 19809 at commit [`9cd03d3`](https://github.com/apache/spark/commit/9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19809 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19810: [SQL][SPARK-22599] In-Memory Table Pruning without Extra...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19810 **[Test build #84151 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84151/testReport)** for PR 19810 at commit [`b4f51ed`](https://github.com/apache/spark/commit/b4f51ed0c0774cfea5d26fffcf6abe6f07775a92). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19810: Partition level pruning 2
GitHub user CodingCat opened a pull request: https://github.com/apache/spark/pull/19810 Partition level pruning 2 ## What changes were proposed in this pull request? In the current implementation of Spark, InMemoryTableExec read all data in a cached table, filter CachedBatch according to stats and pass data to the downstream operators. This implementation makes it inefficient to reside the whole table in memory to serve various queries against different partitions of the table, which occupies a certain portion of our users' scenarios. The following is an example of such a use case: store_sales is a 1TB-sized table in cloud storage, which is partitioned by 'location'. The first query, Q1, wants to output several metrics A, B, C for all stores in all locations. After that, a small team of 3 data scientists wants to do some causal analysis for the sales in different locations. To avoid unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole table in memory in Q1. With the current implementation, even any one of the data scientists is only interested in one out of three locations, the queries they submit to Spark cluster is still reading 1TB data completely. The reason behind the extra reading operation is that we implement CachedBatch as case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: InternalRow) where the stats is a part of every CachedBatch, so we can only filter batches for output of InMemoryTableExec operator by reading all data in in-memory table as input. The extra reading would be even more unacceptable when some of the table's data is evicted to disks. We propose to introduce a new type of block, metadata block, for the partitions of RDD representing data in the cached table. Every metadata block contains stats info for all columns in a partition and is saved to BlockManager when executing compute() method for the partition. To minimize the number of bytes to read, ## How was this patch tested? (TBD, post it soon) You can merge this pull request into a Git repository by running: $ git pull https://github.com/CodingCat/spark partition_level_pruning_2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19810 commit c3f1a9bb4b53ceef8ad3fb31b3164665549f8bc9 Author: CodingCatDate: 2016-03-07T14:37:37Z improve the doc for "spark.memory.offHeap.size" commit 6a2b3ca56d26f5fb03d165466e9b6edadeb0adac Author: CodingCat Date: 2016-03-07T19:00:16Z fix commit 6e37fa2d06107c175cede4ff1b1a65dd14de7d5a Author: CodingCat Date: 2017-10-27T17:36:08Z add configuration for partition_metadata commit aa7066038e23cea7c5eb720aa70cd1a84d6f751f Author: CodingCat Date: 2017-10-27T19:49:33Z framework of CachedColumnarRDD commit d1380821d4a9cbb45f5b28c775939e0d130f0922 Author: CodingCat Date: 2017-10-27T22:53:38Z code framework commit a72d7798b17000250eda0bcc8ae726b7dce7aa0c Author: CodingCat Date: 2017-10-30T20:41:18Z remove cachedcolumnarbatchRDD commit 0fe35f82fcd9175b409429c03c9f7b33712df8ae Author: CodingCat Date: 2017-10-30T21:12:24Z fix styly error commit 9e342432a82cbe325f338bd787be6427002981e1 Author: CodingCat Date: 2017-10-31T00:01:53Z temp commit 677ca81709fa34b3cad13f5843b8f408e401476b Author: CodingCat Date: 2017-11-01T23:01:49Z 'CachedColumnarRDD' commit df1d79620f7c8073b6bc1a119a245c3d5413ec71 Author: CodingCat Date: 2017-11-01T23:11:52Z change types commit 08fd0857024a192307e66b6c3cffb19ae879000b Author: CodingCat Date: 2017-11-01T23:16:58Z fix compilation error commit d4fc2b772b60161a24892133bdae2ff28233250a Author: CodingCat Date: 2017-11-02T17:13:15Z update commit 97a63d6d1c1cd81bb97f0b998e716b13f5bd92d9 Author: CodingCat Date: 2017-11-02T17:21:23Z fix storage level commit a24b7bbfa6c393974d553ba703777934bed85ec1 Author: CodingCat Date: 2017-11-02T17:42:25Z fix getOrCompute commit 0e8e6395df97b4adeabb5a32ad441b96e5c19eb9 Author: CodingCat Date: 2017-11-02T17:55:36Z evaluate with partition metadata commit b89d58b26650ce9b63713a8a2371e280986720bb Author: CodingCat Date: 2017-11-02T18:21:19Z fix getOrCompute commit 3f2eae73cefd5676c799a6fd1e384556ee6c33a8 Author: CodingCat Date: 2017-11-02T19:06:49Z add
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user mgaido91 commented on the issue: https://github.com/apache/spark/pull/19518 @kiszk I meant that `janinoc` creates a slightly different constant pool from `javac`. I am not sure about performances, but the number of constant pool entries is definitely different. For instance, let's take this example and analyze the `Outer` class constant pool: ``` class Outer { private Inner innerInstance = new Inner(); private void outerMethod(){ innerInstance.b = 1; innerInstance.a = 1; } private boolean outerMethod2(){ return innerInstance.b > innerInstance.a; } private class Inner { int b =2; private int a = 0; } } ``` if you compile it with `javac`, the constant pool will be: ``` Constant pool: #1 = Methodref #9.#25 // java/lang/Object."":()V #2 = Class #26// Outer$Inner #3 = Methodref #2.#27 // Outer$Inner."":(LOuter;LOuter$1;)V #4 = Fieldref #8.#28 // Outer.innerInstance:LOuter$Inner; #5 = Fieldref #2.#29 // Outer$Inner.b:I #6 = Methodref #2.#30 // Outer$Inner.access$102:(LOuter$Inner;I)I #7 = Methodref #2.#31 // Outer$Inner.access$100:(LOuter$Inner;)I #8 = Class #32// Outer #9 = Class #33// java/lang/Object #10 = Class #34// Outer$1 #11 = Utf8 InnerClasses #12 = Utf8 Inner #13 = Utf8 innerInstance #14 = Utf8 LOuter$Inner; #15 = Utf8 #16 = Utf8 ()V #17 = Utf8 Code #18 = Utf8 LineNumberTable #19 = Utf8 outerMethod #20 = Utf8 outerMethod2 #21 = Utf8 ()Z #22 = Utf8 StackMapTable #23 = Utf8 SourceFile #24 = Utf8 Outer.java #25 = NameAndType#15:#16// "":()V #26 = Utf8 Outer$Inner #27 = NameAndType#15:#35// "":(LOuter;LOuter$1;)V #28 = NameAndType#13:#14// innerInstance:LOuter$Inner; #29 = NameAndType#36:#37// b:I #30 = NameAndType#38:#39// access$102:(LOuter$Inner;I)I #31 = NameAndType#40:#41// access$100:(LOuter$Inner;)I #32 = Utf8 Outer #33 = Utf8 java/lang/Object #34 = Utf8 Outer$1 #35 = Utf8 (LOuter;LOuter$1;)V #36 = Utf8 b #37 = Utf8 I #38 = Utf8 access$102 #39 = Utf8 (LOuter$Inner;I)I #40 = Utf8 access$100 #41 = Utf8 (LOuter$Inner;)I ``` (please note that it creates a fake getter and a fake setter method entries for the `private` inner variable `a`). If you compile the same class with `janinoc`, instead, the constant pool will be: ``` Constant pool: #1 = Utf8 Outer #2 = Class #1 // Outer #3 = Utf8 java/lang/Object #4 = Class #3 // java/lang/Object #5 = Utf8 SourceFile #6 = Utf8 Outer.java #7 = Utf8 outerMethod$ #8 = Utf8 (LOuter;)V #9 = Utf8 innerInstance #10 = Utf8 LOuter$Inner; #11 = NameAndType#9:#10 // innerInstance:LOuter$Inner; #12 = Fieldref #2.#11 // Outer.innerInstance:LOuter$Inner; #13 = Utf8 Outer$Inner #14 = Class #13// Outer$Inner #15 = Utf8 b #16 = Utf8 I #17 = NameAndType#15:#16// b:I #18 = Fieldref #14.#17// Outer$Inner.b:I #19 = Utf8 a #20 = NameAndType#19:#16// a:I #21 = Fieldref #14.#20// Outer$Inner.a:I #22 = Utf8 LineNumberTable #23 = Utf8 Code #24 = Utf8 outerMethod2$ #25 = Utf8 (LOuter;)Z #26 = Utf8 #27 = Utf8 ()V #28 = NameAndType#26:#27// "":()V #29 = Methodref #4.#28 // java/lang/Object."":()V #30 = NameAndType#26:#8 // "":(LOuter;)V #31 = Methodref #14.#30// Outer$Inner."":(LOuter;)V #32 = Utf8 Inner #33 = Utf8 InnerClasses ``` (note that `a` now is considered as a regular field). Thus in all our
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152908363 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- From above `statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1`, looks like we need to have at least two times of this threshold to enable this parallel aggregation? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152907606 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Looks like only `parallelism` >= 2, this parallel aggregation is enabled. Is it equal to `the number of mappers * shuffle partitions >= this threshold`? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152907079 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Sorry, but didn't get you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152906960 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") --- End diff -- I think we don't need to fully utilize all available processors. `parallelAggThreshold` is default to be 10^7, which means a relatively small task to deal with. Therefore the tasks don't need to be cut smaller in most cases. For some cases where the split is a big task, `parallelAggThreshold` should be tuned. This is not very direct because you don't have a `xx.parallelism` config to set, but the benefit is we introduced less configs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...
Github user vinodkc commented on the issue: https://github.com/apache/spark/pull/19809 ping @cloud-fan --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19809 **[Test build #84150 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84150/testReport)** for PR 19809 at commit [`9cd03d3`](https://github.com/apache/spark/commit/9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 t...
GitHub user vinodkc opened a pull request: https://github.com/apache/spark/pull/19809 [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branch-2.2 ## What changes were proposed in this pull request? A followup of > https://github.com/apache/spark/pull/19795 , to simplify the file creation. ## How was this patch tested? Only test case is updated You can merge this pull request into a Git repository by running: $ git pull https://github.com/vinodkc/spark br_FollowupSPARK-17920_branch-2.2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19809 commit 9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b Author: vinodkcDate: 2017-11-24T05:59:30Z simplify the schema file creation in test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19808 **[Test build #84149 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84149/testReport)** for PR 19808 at commit [`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19808 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19808 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19808 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84147/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19808 **[Test build #84147 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84147/testReport)** for PR 19808 at commit [`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152899229 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala --- @@ -108,20 +108,22 @@ trait CodegenSupport extends SparkPlan { /** * Consume the generated columns or row from current SparkPlan, call its parent's `doConsume()`. + * + * Note that `outputVars` and `row` can't both be null. */ final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: String = null): String = { val inputVars = - if (row != null) { + if (outputVars != null) { +assert(outputVars.length == output.length) --- End diff -- Is it better to add `assert(row == null)` to ensure your comment below? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19788 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84148/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19788 **[Test build #84148 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84148/testReport)** for PR 19788 at commit [`e437a26`](https://github.com/apache/spark/commit/e437a269c184f4d36afb4002faf0c6258d0199bd). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19788 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19788 **[Test build #84148 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84148/testReport)** for PR 19788 at commit [`e437a26`](https://github.com/apache/spark/commit/e437a269c184f4d36afb4002faf0c6258d0199bd). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152896879 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1).toInt + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") --- End diff -- The value of `parallelism` seems making us not fully utilize all processors at all time? E.g, if `availableProcessors` returns 8, but `parallelism` is 2, we pick 2 as number of threads. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152896399 --- Diff: core/src/main/scala/org/apache/spark/internal/config/package.scala --- @@ -485,4 +485,13 @@ package object config { "array in the sorter.") .intConf .createWithDefault(Integer.MAX_VALUE) + + private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD = +ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold") + .internal() + .doc("Multi-thread is used when the number of mappers * shuffle partitions is greater than " + +"or equal to this threshold.") --- End diff -- Is this condition to enable parallel aggregation still true? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user maropu commented on the issue: https://github.com/apache/spark/pull/19518 I like the latest @kiszk hybrid idea in terms of performance and readability. Also, this is a corner case, so I don't want affect most regular small queries. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsa...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19789#discussion_r152895550 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -211,8 +211,8 @@ private[spark] class KafkaRDD[K, V]( var requestOffset = part.fromOffset def closeIfNeeded(): Unit = { - if (!useConsumerCache && consumer != null) { -consumer.close + if (consumer != null) { + consumer.close() --- End diff -- I think this should be double spaced --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19775 Do we have to put this in Spark, is it a necessary part of k8s? I think if we pull in that PR(https://github.com/apache/spark/pull/11994), then this can be stayed out of Spark as a package. Even without #11994 , I believe users can still add their own Metrics source/sink via exposed SparkEnv/MetricsSystem. My concern is that this unnecessarily increases the code base of spark core. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84146/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19803 **[Test build #84146 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84146/testReport)** for PR 19803 at commit [`d674494`](https://github.com/apache/spark/commit/d67449428a107dc840bd801b820c0ec3d39f4241). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84145/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19803 **[Test build #84145 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84145/testReport)** for PR 19803 at commit [`7306d5d`](https://github.com/apache/spark/commit/7306d5d6a6fe88d3660b4197eea43f59d4124332). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19802: [WIP][SPARK-22594][CORE] Handling spark-submit and maste...
Github user jerryshao commented on the issue: https://github.com/apache/spark/pull/19802 Can you please explain more, and how to reproduce this issue? Spark's RPC is not designed for version compatible. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891920 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- Also if length is "1", then this will always be Zero. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891792 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends Logging { logError(errorMessage) throw new MetadataFetchFailedException(shuffleId, startPartition, errorMessage) } else { +var totalSize = 0L for (part <- startPartition until endPartition) { - splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) += -((ShuffleBlockId(shuffleId, mapId, part), status.getSizeForBlock(part))) + totalSize += status.getSizeForBlock(part) --- End diff -- This can be simplified like: `val totalSize = (startPartition until endPartition).map(status.getSizeForXXX).sum`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891172 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") --- End diff -- Not necessary to add this, I guess this is mainly for your debug purpose. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/19788#discussion_r152891438 --- Diff: core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala --- @@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver( override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = { // The block is actually going to be a range of a single map output file for this map, so // find out the consolidated file, then the offset within that from our index +logDebug(s"Fetch block data for $blockId") val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId) val in = new DataInputStream(Files.newInputStream(indexFile.toPath)) try { ByteStreams.skipFully(in, blockId.reduceId * 8) val offset = in.readLong() + ByteStreams.skipFully(in, (blockId.length - 1) * 8) --- End diff -- I doubt this line is not correct, this seems change the semantics, for example if startPartition is 3, endPartition is 8, originally it should be (3\*8), now it changes to (4\*8), can you please explain more? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19808#discussion_r152891440 --- Diff: bin/find-spark-home.cmd --- @@ -32,7 +32,7 @@ if not "x%PYSPARK_PYTHON%"=="x" ( ) rem If there is python installed, trying to use the root dir as SPARK_HOME -where %PYTHON_RUNNER% > nul 2>$1 +where %PYTHON_RUNNER% > nul 2>&1 --- End diff -- This was a mistake. It should be `&` otherwise creates an empty file called `$1`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19808#discussion_r152891405 --- Diff: bin/spark-sql.cmd --- @@ -0,0 +1,25 @@ +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +remhttp://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem This is the entry point for running SparkSQL. To avoid polluting the +rem environment, it just launches a new cmd to do the real work. + +rem The outermost quotes are used to prevent Windows command line parse error +rem when there are some quotes in parameters, see SPARK-21877. +cmd /V /E /C ""%~dp0spark-sql2.cmd" %*" --- End diff -- Separate script is required. Otherwise, it will actually set the environment variable, `SPARK_HOME`. See SPARK-3943 and I also manually tested. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19808 **[Test build #84147 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84147/testReport)** for PR 19808 at commit [`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19808 cc @cloud-fan, @felixcheung, @jsnowacki and @srowen who I could think are probably interested in this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...
GitHub user HyukjinKwon opened a pull request: https://github.com/apache/spark/pull/19808 [SPARK-22597][SQL] Add spark-sql cmd script for Windows users ## What changes were proposed in this pull request? This PR proposes to add cmd scripts so that Windows users can also run `spark-sql` script. ## How was this patch tested? Manually tested on Windows. ```cmd C:\...\spark>.\bin\spark-sql '.\bin\spark-sql' is not recognized as an internal or external command, operable program or batch file. C:\...\spark>.\bin\spark-sql.cmd '.\bin\spark-sql.cmd' is not recognized as an internal or external command, operable program or batch file. ``` ```cmd C:\...\spark>.\bin\spark-sql ... spark-sql> SELECT 'Hello World !!'; ... Hello World !! Time taken: 4.022 seconds, Fetched 1 row(s) ``` You can merge this pull request into a Git repository by running: $ git pull https://github.com/HyukjinKwon/spark spark-sql-cmd Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/19808.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #19808 commit d1ddede1ac34deb3732670e7425ec0b84b6d0508 Author: hyukjinkwonDate: 2017-11-24T01:10:47Z Add spark-sql script for Windows users --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...
Github user tengpeng commented on a diff in the pull request: https://github.com/apache/spark/pull/17819#discussion_r152891005 --- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala --- @@ -108,26 +164,53 @@ final class Bucketizer @Since("1.4.0") (@Since("1.4.0") override val uid: String } } -val bucketizer: UserDefinedFunction = udf { (feature: Double) => - Bucketizer.binarySearchForBuckets($(splits), feature, keepInvalid) -}.withName("bucketizer") +val seqOfSplits = if (isBucketizeMultipleColumns()) { + $(splitsArray).toSeq --- End diff -- I am interested in the motivation of using `.toSeq` and `Seq()` here --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19800 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84144/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19800 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19800 **[Test build #84144 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84144/testReport)** for PR 19800 at commit [`7462a55`](https://github.com/apache/spark/commit/7462a55a4256143c39fdaacfdaf334dfc4ba8b7d). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19518 @mgaido91 Thank you for your questions. 1. I am using `javac` as shown. I am sorry that I cannot understand what you are pointing out. In this benchmark, what are differences between `javac` and `janinoc`? 2. I agree with you. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19518 Based on performance results and usage of constant pool entry, I would like to use hybrid approach with flat global variable and array. For example, first 500 variables are stored into flat global variables, then others are stored into arrays with 32767 elements. I think that most of non-extreme cases can enjoy simple code without array accesses and good performance. WDYT? ``` class Foo { int globalVars1; int globalVars2; ... int globalVars499; int globalVars500; int[] globalArrays1 = new int[32767]; int[] globalArrays2 = new int[32767]; int[] globalArrays3 = new int[32767]; ... void apply1(InternalRow i) { globalVars1 = 1; globalVars2 = 1; ... globalVars499 = 1; globalVars500 = 1; } void apply2(InternalRow i) { globalArrays1[0] = 1; globalArrays1[1] = 1; ... } void apply(InternalRow i) { apply0(i); apply1(i); apply2(i); ... } } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...
Github user kiszk commented on the issue: https://github.com/apache/spark/pull/19518 I created and ran another synthetic benchmark program for comparing flat global variables, inner global variables, and array. In summary, the followings are performance results (**small number is better**). - 0.65: flat global variables - 0.91: inner global variables - 1: array WDYT? Any comments are very appreciated. Here are [Test.java](https://gist.github.com/kiszk/63c2829488cb777d7ca78d45d20c021f) and [myInsntance.py](https://gist.github.com/kiszk/049a62f5d1259481c400a86299bd0228) that I used. ``` $ cat /proc/cpuinfo | grep "model name" | uniq model name : Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz $ java -version openjdk version "1.8.0_131" OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-2ubuntu1.16.04.3-b11) OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode) $ python myInstance.py > MyInstance.java && javac Test.java && java Test Result(us): Array 0: 462848.969 1: 461978.693 2: 463174.459 3: 461422.763 4: 460563.915 5: 460112.262 6: 460059.957 7: 460376.230 8: 460245.445 9: 460308.775 10: 460154.955 11: 460005.629 12: 460330.584 13: 460277.612 14: 460181.360 15: 460168.843 16: 459790.137 17: 460248.481 18: 460344.471 19: 460084.529 20: 459987.263 21: 459961.639 22: 459952.447 23: 460128.518 24: 460025.783 25: 459874.303 26: 459932.685 27: 460065.736 28: 459954.526 29: 459972.679 BEST: 459790.137000, AVG: 460417.788 Result(us): InnerVars 0: 421013.480 1: 420279.235 2: 419366.157 3: 421015.934 4: 419540.049 5: 420316.650 6: 419816.612 7: 420211.140 8: 420215.864 9: 421104.657 10: 421836.430 11: 420866.894 12: 421457.850 13: 421734.506 14: 420796.010 15: 419832.910 16: 420012.167 17: 420821.800 18: 420962.178 19: 421981.676 20: 421721.257 21: 419996.594 22: 419742.884 23: 420158.066 24: 420156.773 25: 420325.231 26: 420966.914 27: 420787.147 28: 420296.789 29: 420520.843 BEST: 419366.157, AVG: 420595.157 Result(us): Vars 0: 343490.797 1: 342849.079 2: 341990.967 3: 342844.044 4: 343484.681 5: 342586.419 6: 342468.883 7: 343113.300 8: 343516.875 9: 343002.395 10: 341499.538 11: 342192.102 12: 341847.383 13: 342533.215 14: 341376.556 15: 342018.111 16: 341316.445 17: 342043.378 18: 341969.932 19: 343415.854 20: 343103.133 21: 342084.686 22: 341555.293 23: 342984.355 24: 342302.336 25: 341994.372 26: 342475.639 27: 342281.214 28: 342205.175 29: 342462.032 BEST: 341316.445, AVG: 342433.606 ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on...
Github user HyukjinKwon commented on a diff in the pull request: https://github.com/apache/spark/pull/19807#discussion_r152889444 --- Diff: bin/find-spark-home.cmd --- @@ -0,0 +1,60 @@ +@echo off + +rem +rem Licensed to the Apache Software Foundation (ASF) under one or more +rem contributor license agreements. See the NOTICE file distributed with +rem this work for additional information regarding copyright ownership. +rem The ASF licenses this file to You under the Apache License, Version 2.0 +rem (the "License"); you may not use this file except in compliance with +rem the License. You may obtain a copy of the License at +rem +remhttp://www.apache.org/licenses/LICENSE-2.0 +rem +rem Unless required by applicable law or agreed to in writing, software +rem distributed under the License is distributed on an "AS IS" BASIS, +rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +rem See the License for the specific language governing permissions and +rem limitations under the License. +rem + +rem Path to Python script finding SPARK_HOME +set FIND_SPARK_HOME_PYTHON_SCRIPT=%~dp0find_spark_home.py + +rem Default to standard python interpreter unless told otherwise +set PYTHON_RUNNER=python +rem If PYSPARK_DRIVER_PYTHON is set, it overwrites the python version +if not "x%PYSPARK_DRIVER_PYTHON%"=="x" ( + set PYTHON_RUNNER=%PYSPARK_DRIVER_PYTHON% +) +rem If PYSPARK_PYTHON is set, it overwrites the python version +if not "x%PYSPARK_PYTHON%"=="x" ( + set PYTHON_RUNNER=%PYSPARK_PYTHON% +) + +rem If there is python installed, trying to use the root dir as SPARK_HOME +where %PYTHON_RUNNER% > nul 2>$1 --- End diff -- There seems a typo here actually !! `where %PYTHON_RUNNER% > nul 2>$1` -> `where %PYTHON_RUNNER% > nul 2>&1`. Will fix it up in master by myself soon. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152888380 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- @cloud-fan `We can shut down the pool after some certain idle time, but not sure if it's worth the complexity` I know we don't need to do this now. But if we did it how to do? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user gczsjdy commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152888257 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- My fault! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPAR...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19806#discussion_r152886393 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala --- @@ -829,7 +829,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-22500: cast for struct should not generate codes beyond 64KB") { -val N = 250 +val N = 25 --- End diff -- Yes, I confirmed that 25 can reproduce the bug of SPARK-22500 by reverting the change of #19730. For example, 20 cannot reproduce the bug. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19803 **[Test build #84146 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84146/testReport)** for PR 19803 at commit [`d674494`](https://github.com/apache/spark/commit/d67449428a107dc840bd801b820c0ec3d39f4241). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19803 **[Test build #84145 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84145/testReport)** for PR 19803 at commit [`7306d5d`](https://github.com/apache/spark/commit/7306d5d6a6fe88d3660b4197eea43f59d4124332). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152881282 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = ctx.javaType(dataType) -val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) { val oev = ctx.currentVars(ordinal) ev.isNull = oev.isNull ev.value = oev.value - val code = oev.code - oev.code = "" - ev.copy(code = code) -} else if (nullable) { - ev.copy(code = s""" -boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal); -$javaType ${ev.value} = ${ev.isNull} ? ${ctx.defaultValue(dataType)} : ($value);""") + ev.copy(code = oev.code) } else { - ev.copy(code = s"""$javaType ${ev.value} = $value;""", isNull = "false") + assert(ctx.INPUT_ROW != null) --- End diff -- Add an assert message. `assert(ctx.INPUT_ROW != null, "INPUT_ROW and currentVars cannot both be null.")` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152881786 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = ctx.javaType(dataType) --- End diff -- Also move this line to the else block. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152883588 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -355,19 +355,12 @@ case class FileSourceScanExec( // PhysicalRDD always just has one input val input = ctx.freshName("input") ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") -val exprRows = output.zipWithIndex.map{ case (a, i) => - BoundReference(i, a.dataType, a.nullable) -} val row = ctx.freshName("row") -ctx.INPUT_ROW = row -ctx.currentVars = null -val columnsRowInput = exprRows.map(_.genCode(ctx)) -val inputRow = if (needsUnsafeRowConversion) null else row --- End diff -- good catch! --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152883562 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -56,9 +56,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val exprs = projectList.map(x => - ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) -ctx.currentVars = input +val exprs = projectList.map(x => BindReferences.bindReference[Expression](x, child.output)) --- End diff -- it doesn't matter, `Alias.genCode` is just a pass-through. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152882655 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala --- @@ -355,19 +355,12 @@ case class FileSourceScanExec( // PhysicalRDD always just has one input val input = ctx.freshName("input") ctx.addMutableState("scala.collection.Iterator", input, s"$input = inputs[0];") -val exprRows = output.zipWithIndex.map{ case (a, i) => - BoundReference(i, a.dataType, a.nullable) -} val row = ctx.freshName("row") -ctx.INPUT_ROW = row -ctx.currentVars = null -val columnsRowInput = exprRows.map(_.genCode(ctx)) -val inputRow = if (needsUnsafeRowConversion) null else row --- End diff -- When `needsUnsafeRowConversion` is true, previously we pass in a null and `consume` will generate code for a unsafe row projection. Now we always pass in the `row` which can possibly not a unsafe row. This looks changing behavior? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/19803 Two comments, LGTM otherwise. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152882876 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala --- @@ -56,9 +56,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], child: SparkPlan) } override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: ExprCode): String = { -val exprs = projectList.map(x => - ExpressionCanonicalizer.execute(BindReferences.bindReference(x, child.output))) -ctx.currentVars = input +val exprs = projectList.map(x => BindReferences.bindReference[Expression](x, child.output)) --- End diff -- This gets rid of `ExpressionCanonicalizer`. Isn't it possibly we have `Alias` in `projectList` here? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19807 LGTM pending AppVeyor tests. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19807 Build started: [SparkR] `ALL` [![PR-19807](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=4A7B521C-83BF-4F47-84AF-94D49BCBF40E=true)](https://ci.appveyor.com/project/spark-test/spark/branch/4A7B521C-83BF-4F47-84AF-94D49BCBF40E) Diff: https://github.com/apache/spark/compare/branch-2.2...spark-test:4A7B521C-83BF-4F47-84AF-94D49BCBF40E --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19800 **[Test build #84144 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84144/testReport)** for PR 19800 at commit [`7462a55`](https://github.com/apache/spark/commit/7462a55a4256143c39fdaacfdaf334dfc4ba8b7d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19803#discussion_r152881704 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala --- @@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: DataType, nullable: Boolean) override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { val javaType = ctx.javaType(dataType) -val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString) if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) { val oev = ctx.currentVars(ordinal) ev.isNull = oev.isNull ev.value = oev.value - val code = oev.code - oev.code = "" - ev.copy(code = code) --- End diff -- Great, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't cha...
Github user viirya commented on a diff in the pull request: https://github.com/apache/spark/pull/19800#discussion_r152881639 --- Diff: sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala --- @@ -149,10 +151,14 @@ object GenerateOrdering extends CodeGenerator[Seq[SortOrder], Ordering[InternalR }) // make sure INPUT_ROW is declared even if splitExpressions // returns an inlined block -s""" +val finalCode = s""" |InternalRow ${ctx.INPUT_ROW} = null; |$code """.stripMargin +// Restore original currentVars and INPUT_ROW. +ctx.currentVars = oldCurrentVars +ctx.INPUT_ROW = oldInputRow +finalCode --- End diff -- Yes, thanks. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/19801 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19801: [SPARK-22592][SQL] cleanup filter converting for hive
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19801 Thanks! Merged to master. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19801: [SPARK-22592][SQL] cleanup filter converting for hive
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/19801 LGTM --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19801#discussion_r152880052 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -660,40 +626,68 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { } def unapply(values: Set[Any]): Option[Seq[String]] = { -values.toSeq.foldLeft(Option(Seq.empty[String])) { - case (Some(accum), value) if valueToLiteralString.isDefinedAt(value) => -Some(accum :+ valueToLiteralString(value)) - case _ => None +val extractables = values.toSeq.map(valueToLiteralString.lift) +if (extractables.nonEmpty && extractables.forall(_.isDefined)) { --- End diff -- The same here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...
Github user gatorsmile commented on a diff in the pull request: https://github.com/apache/spark/pull/19801#discussion_r152880027 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala --- @@ -643,9 +607,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 { object ExtractableLiterals { def unapply(exprs: Seq[Expression]): Option[Seq[String]] = { - exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) { - case (Some(accum), Some(value)) => Some(accum :+ value) - case _ => None +val extractables = exprs.map(ExtractableLiteral.unapply) +if (extractables.nonEmpty && extractables.forall(_.isDefined)) { --- End diff -- -> `if (extractables.exists(_.isDefined))` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPAR...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19806#discussion_r152879501 --- Diff: sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala --- @@ -829,7 +829,7 @@ class CastSuite extends SparkFunSuite with ExpressionEvalHelper { } test("SPARK-22500: cast for struct should not generate codes beyond 64KB") { -val N = 250 +val N = 25 --- End diff -- is 25 big enough to reproduce the bug of SPARK-22595? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19789 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19789 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84143/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19789 **[Test build #84143 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84143/testReport)** for PR 19789 at commit [`c2c3ed9`](https://github.com/apache/spark/commit/c2c3ed9e2e672b3b9205a824b8cdd2f9ca2131eb). * This patch passes all tests. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `class CachedKafkaConsumerSuite extends SparkFunSuite with BeforeAndAfterAll ` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19789 **[Test build #84143 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84143/testReport)** for PR 19789 at commit [`c2c3ed9`](https://github.com/apache/spark/commit/c2c3ed9e2e672b3b9205a824b8cdd2f9ca2131eb). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19807 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19807 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84142/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19807 **[Test build #84142 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84142/testReport)** for PR 19807 at commit [`bd24e47`](https://github.com/apache/spark/commit/bd24e470227437a52b07d95335579f1afcdda905). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/19789 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19806 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19806 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84141/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19806 **[Test build #84141 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84141/testReport)** for PR 19806 at commit [`48103dd`](https://github.com/apache/spark/commit/48103dd26817629cc00619ea8659d14533f53fb5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19763#discussion_r152870781 --- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala --- @@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster( shuffleStatuses.get(shuffleId).map(_.findMissingPartitions()) } + /** + * Grouped function of Range, this is to avoid traverse of all elements of Range using + * IterableLike's grouped function. + */ + def rangeGrouped(range: Range, size: Int): Seq[Range] = { +val start = range.start +val step = range.step +val end = range.end +for (i <- start.until(end, size * step)) yield { + i.until(i + size * step, step) +} + } + + /** + * To equally divide n elements into m buckets, basically each bucket should have n/m elements, + * for the remaining n%m elements, add one more element to the first n%m buckets each. + */ + def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = { +val elementsPerBucket = numElements / numBuckets +val remaining = numElements % numBuckets +val splitPoint = (elementsPerBucket + 1) * remaining +if (elementsPerBucket == 0) { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) +} else { + rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++ +rangeGrouped(splitPoint.until(numElements), elementsPerBucket) +} + } + /** * Return statistics about all of the outputs for a given shuffle. */ def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics = { shuffleStatuses(dep.shuffleId).withMapStatuses { statuses => val totalSizes = new Array[Long](dep.partitioner.numPartitions) - for (s <- statuses) { -for (i <- 0 until totalSizes.length) { - totalSizes(i) += s.getSizeForBlock(i) + val parallelAggThreshold = conf.get( +SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD) + val parallelism = math.min( +Runtime.getRuntime.availableProcessors(), +statuses.length * totalSizes.length / parallelAggThreshold + 1) + if (parallelism <= 1) { +for (s <- statuses) { + for (i <- 0 until totalSizes.length) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } else { +try { + val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate") + implicit val executionContext = ExecutionContext.fromExecutor(threadPool) + val mapStatusSubmitTasks = equallyDivide(totalSizes.length, parallelism).map { +reduceIds => Future { + for (s <- statuses; i <- reduceIds) { +totalSizes(i) += s.getSizeForBlock(i) + } +} + } + ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), Duration.Inf) +} finally { + threadpool.shutdown() --- End diff -- ah good catch! I misread it... --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19763 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19763 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84134/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19763 **[Test build #84134 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84134/testReport)** for PR 19763 at commit [`72c3d97`](https://github.com/apache/spark/commit/72c3d97e6e2f2c50504c5e4d8b80ea595797b044). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84137/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19803 **[Test build #84137 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84137/testReport)** for PR 19803 at commit [`6758a34`](https://github.com/apache/spark/commit/6758a34aa6bd528a8fa8ab7d5db8964d1121f848). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19803 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84138/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org