[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152912084
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Do you think it's necessary to indicate the actual parallelism's 
calculation way here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911936
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Oh, I see...nvm. I misread the `+ 1`...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911829
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Says `statuses.length.toLong * totalSizes.length` is `1001`, for 
example:

```scala
scala> 1001 / 1001
res0: Int = 1
```

Now, it is more than the threshold, but the parallel aggregation is not 
enabled...



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SPARK-22599][SQL] In-Memory Table Pruning without Extra...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19810
  
**[Test build #84152 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84152/testReport)**
 for PR 19810 at commit 
[`accd549`](https://github.com/apache/spark/commit/accd5493464d5bd5f7401285defbd8ca2628ae68).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152911325
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

`statuses.length.toLong * totalSizes.length / parallelAggThreshold + 1` >= 
2 -> `statuses.length.toLong * totalSizes.length >= parallelAggThreshold`, so 
it doesn't need to be 2 times, just not smaller than 1x is good.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19809
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84150/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19809
  
**[Test build #84150 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84150/testReport)**
 for PR 19809 at commit 
[`9cd03d3`](https://github.com/apache/spark/commit/9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19809
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19810: [SQL][SPARK-22599] In-Memory Table Pruning without Extra...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19810
  
**[Test build #84151 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84151/testReport)**
 for PR 19810 at commit 
[`b4f51ed`](https://github.com/apache/spark/commit/b4f51ed0c0774cfea5d26fffcf6abe6f07775a92).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19810: Partition level pruning 2

2017-11-23 Thread CodingCat
GitHub user CodingCat opened a pull request:

https://github.com/apache/spark/pull/19810

Partition level pruning 2

## What changes were proposed in this pull request?

In the current implementation of Spark, InMemoryTableExec read all data in 
a cached table, filter CachedBatch according to stats and pass data to the 
downstream operators. This implementation makes it inefficient to reside the 
whole table in memory to serve various queries against different partitions of 
the table, which occupies a certain portion of our users' scenarios.

The following is an example of such a use case:

store_sales is a 1TB-sized table in cloud storage, which is partitioned by 
'location'. The first query, Q1, wants to output several metrics A, B, C for 
all stores in all locations. After that, a small team of 3 data scientists 
wants to do some causal analysis for the sales in different locations. To avoid 
unnecessary I/O and parquet/orc parsing overhead, they want to cache the whole 
table in memory in Q1.

With the current implementation, even any one of the data scientists is 
only interested in one out of three locations, the queries they submit to Spark 
cluster is still reading 1TB data completely.

The reason behind the extra reading operation is that we implement 
CachedBatch as

case class CachedBatch(numRows: Int, buffers: Array[Array[Byte]], stats: 
InternalRow)
where the stats is a part of every CachedBatch, so we can only filter 
batches for output of InMemoryTableExec operator by reading all data in 
in-memory table as input. The extra reading would be even more unacceptable 
when some of the table's data is evicted to disks.

We propose to introduce a new type of block, metadata block, for the 
partitions of RDD representing data in the cached table. Every metadata block 
contains stats info for all columns in a partition and is saved to BlockManager 
when executing compute() method for the partition. To minimize the number of 
bytes to read,

## How was this patch tested?

(TBD, post it soon)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/CodingCat/spark partition_level_pruning_2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19810


commit c3f1a9bb4b53ceef8ad3fb31b3164665549f8bc9
Author: CodingCat 
Date:   2016-03-07T14:37:37Z

improve the doc for "spark.memory.offHeap.size"

commit 6a2b3ca56d26f5fb03d165466e9b6edadeb0adac
Author: CodingCat 
Date:   2016-03-07T19:00:16Z

fix

commit 6e37fa2d06107c175cede4ff1b1a65dd14de7d5a
Author: CodingCat 
Date:   2017-10-27T17:36:08Z

add configuration for partition_metadata

commit aa7066038e23cea7c5eb720aa70cd1a84d6f751f
Author: CodingCat 
Date:   2017-10-27T19:49:33Z

framework of CachedColumnarRDD

commit d1380821d4a9cbb45f5b28c775939e0d130f0922
Author: CodingCat 
Date:   2017-10-27T22:53:38Z

code framework

commit a72d7798b17000250eda0bcc8ae726b7dce7aa0c
Author: CodingCat 
Date:   2017-10-30T20:41:18Z

remove cachedcolumnarbatchRDD

commit 0fe35f82fcd9175b409429c03c9f7b33712df8ae
Author: CodingCat 
Date:   2017-10-30T21:12:24Z

fix styly error

commit 9e342432a82cbe325f338bd787be6427002981e1
Author: CodingCat 
Date:   2017-10-31T00:01:53Z

temp

commit 677ca81709fa34b3cad13f5843b8f408e401476b
Author: CodingCat 
Date:   2017-11-01T23:01:49Z

'CachedColumnarRDD'

commit df1d79620f7c8073b6bc1a119a245c3d5413ec71
Author: CodingCat 
Date:   2017-11-01T23:11:52Z

change types

commit 08fd0857024a192307e66b6c3cffb19ae879000b
Author: CodingCat 
Date:   2017-11-01T23:16:58Z

fix compilation error

commit d4fc2b772b60161a24892133bdae2ff28233250a
Author: CodingCat 
Date:   2017-11-02T17:13:15Z

update

commit 97a63d6d1c1cd81bb97f0b998e716b13f5bd92d9
Author: CodingCat 
Date:   2017-11-02T17:21:23Z

fix storage level

commit a24b7bbfa6c393974d553ba703777934bed85ec1
Author: CodingCat 
Date:   2017-11-02T17:42:25Z

fix getOrCompute

commit 0e8e6395df97b4adeabb5a32ad441b96e5c19eb9
Author: CodingCat 
Date:   2017-11-02T17:55:36Z

evaluate with partition metadata

commit b89d58b26650ce9b63713a8a2371e280986720bb
Author: CodingCat 
Date:   2017-11-02T18:21:19Z

fix getOrCompute

commit 3f2eae73cefd5676c799a6fd1e384556ee6c33a8
Author: CodingCat 
Date:   2017-11-02T19:06:49Z

add 

[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-11-23 Thread mgaido91
Github user mgaido91 commented on the issue:

https://github.com/apache/spark/pull/19518
  
@kiszk I meant that `janinoc` creates a slightly different constant pool 
from `javac`. I am not sure about performances, but the number of constant pool 
entries is definitely different. For instance, let's take this example and 
analyze the `Outer` class constant pool:
```
class Outer {
  private Inner innerInstance = new Inner();
  private void outerMethod(){
innerInstance.b = 1;
innerInstance.a = 1;
  }
  private boolean outerMethod2(){
return innerInstance.b > innerInstance.a;
  }
  private class Inner {
int b =2;
private int a = 0;
  }
}
```
if you compile it with `javac`, the constant pool will be:
```
Constant pool:
   #1 = Methodref  #9.#25 // java/lang/Object."":()V
   #2 = Class  #26// Outer$Inner
   #3 = Methodref  #2.#27 // 
Outer$Inner."":(LOuter;LOuter$1;)V
   #4 = Fieldref   #8.#28 // 
Outer.innerInstance:LOuter$Inner;
   #5 = Fieldref   #2.#29 // Outer$Inner.b:I
   #6 = Methodref  #2.#30 // 
Outer$Inner.access$102:(LOuter$Inner;I)I
   #7 = Methodref  #2.#31 // 
Outer$Inner.access$100:(LOuter$Inner;)I
   #8 = Class  #32// Outer
   #9 = Class  #33// java/lang/Object
  #10 = Class  #34// Outer$1
  #11 = Utf8   InnerClasses
  #12 = Utf8   Inner
  #13 = Utf8   innerInstance
  #14 = Utf8   LOuter$Inner;
  #15 = Utf8   
  #16 = Utf8   ()V
  #17 = Utf8   Code
  #18 = Utf8   LineNumberTable
  #19 = Utf8   outerMethod
  #20 = Utf8   outerMethod2
  #21 = Utf8   ()Z
  #22 = Utf8   StackMapTable
  #23 = Utf8   SourceFile
  #24 = Utf8   Outer.java
  #25 = NameAndType#15:#16// "":()V
  #26 = Utf8   Outer$Inner
  #27 = NameAndType#15:#35// "":(LOuter;LOuter$1;)V
  #28 = NameAndType#13:#14// innerInstance:LOuter$Inner;
  #29 = NameAndType#36:#37// b:I
  #30 = NameAndType#38:#39// access$102:(LOuter$Inner;I)I
  #31 = NameAndType#40:#41// access$100:(LOuter$Inner;)I
  #32 = Utf8   Outer
  #33 = Utf8   java/lang/Object
  #34 = Utf8   Outer$1
  #35 = Utf8   (LOuter;LOuter$1;)V
  #36 = Utf8   b
  #37 = Utf8   I
  #38 = Utf8   access$102
  #39 = Utf8   (LOuter$Inner;I)I
  #40 = Utf8   access$100
  #41 = Utf8   (LOuter$Inner;)I
```
(please note that it creates a fake getter and a fake setter method entries 
for the `private` inner variable `a`). 

If you compile the same class with `janinoc`, instead, the constant pool 
will be:
```
Constant pool:
   #1 = Utf8   Outer
   #2 = Class  #1 // Outer
   #3 = Utf8   java/lang/Object
   #4 = Class  #3 // java/lang/Object
   #5 = Utf8   SourceFile
   #6 = Utf8   Outer.java
   #7 = Utf8   outerMethod$
   #8 = Utf8   (LOuter;)V
   #9 = Utf8   innerInstance
  #10 = Utf8   LOuter$Inner;
  #11 = NameAndType#9:#10 // innerInstance:LOuter$Inner;
  #12 = Fieldref   #2.#11 // 
Outer.innerInstance:LOuter$Inner;
  #13 = Utf8   Outer$Inner
  #14 = Class  #13// Outer$Inner
  #15 = Utf8   b
  #16 = Utf8   I
  #17 = NameAndType#15:#16// b:I
  #18 = Fieldref   #14.#17// Outer$Inner.b:I
  #19 = Utf8   a
  #20 = NameAndType#19:#16// a:I
  #21 = Fieldref   #14.#20// Outer$Inner.a:I
  #22 = Utf8   LineNumberTable
  #23 = Utf8   Code
  #24 = Utf8   outerMethod2$
  #25 = Utf8   (LOuter;)Z
  #26 = Utf8   
  #27 = Utf8   ()V
  #28 = NameAndType#26:#27// "":()V
  #29 = Methodref  #4.#28 // java/lang/Object."":()V
  #30 = NameAndType#26:#8 // "":(LOuter;)V
  #31 = Methodref  #14.#30// Outer$Inner."":(LOuter;)V
  #32 = Utf8   Inner
  #33 = Utf8   InnerClasses
```
(note that `a` now is considered as a regular field).

Thus in all our 

[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152908363
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

From above `statuses.length.toLong * totalSizes.length / 
parallelAggThreshold + 1`, looks like we need to have at least two times of 
this threshold to enable this parallel aggregation?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152907606
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Looks like only `parallelism` >= 2, this parallel aggregation is enabled. 
Is it equal to `the number of mappers * shuffle partitions >= this threshold`?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152907079
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Sorry, but didn't get you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152906960
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1).toInt
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-aggregate")
--- End diff --

I think we don't need to fully utilize all available processors. 
`parallelAggThreshold` is default to be 10^7, which means a relatively small 
task to deal with. Therefore the tasks don't need to be cut smaller in most 
cases. 
For some cases where the split is a big task, `parallelAggThreshold` should 
be tuned. This is not very direct because you don't have a `xx.parallelism` 
config to set, but the benefit is we introduced less configs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...

2017-11-23 Thread vinodkc
Github user vinodkc commented on the issue:

https://github.com/apache/spark/pull/19809
  
ping @cloud-fan 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branc...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19809
  
**[Test build #84150 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84150/testReport)**
 for PR 19809 at commit 
[`9cd03d3`](https://github.com/apache/spark/commit/9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19809: [SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 t...

2017-11-23 Thread vinodkc
GitHub user vinodkc opened a pull request:

https://github.com/apache/spark/pull/19809

[SPARK-17920][SQL] [FOLLOWUP] Backport PR 19779 to branch-2.2

## What changes were proposed in this pull request?

A followup of 

> https://github.com/apache/spark/pull/19795

 , to simplify the file creation.

## How was this patch tested?

Only test case is updated


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/vinodkc/spark 
br_FollowupSPARK-17920_branch-2.2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19809


commit 9cd03d38500f04d8d1ebf8771e79b1ba82d1f79b
Author: vinodkc 
Date:   2017-11-24T05:59:30Z

simplify the schema file creation in test




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19808
  
**[Test build #84149 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84149/testReport)**
 for PR 19808 at commit 
[`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19808
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19808
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19808
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84147/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19808
  
**[Test build #84147 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84147/testReport)**
 for PR 19808 at commit 
[`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152899229
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/WholeStageCodegenExec.scala
 ---
@@ -108,20 +108,22 @@ trait CodegenSupport extends SparkPlan {
 
   /**
* Consume the generated columns or row from current SparkPlan, call its 
parent's `doConsume()`.
+   *
+   * Note that `outputVars` and `row` can't both be null.
*/
   final def consume(ctx: CodegenContext, outputVars: Seq[ExprCode], row: 
String = null): String = {
 val inputVars =
-  if (row != null) {
+  if (outputVars != null) {
+assert(outputVars.length == output.length)
--- End diff --

Is it better to add `assert(row == null)` to ensure your comment below?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19788
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84148/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19788
  
**[Test build #84148 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84148/testReport)**
 for PR 19788 at commit 
[`e437a26`](https://github.com/apache/spark/commit/e437a269c184f4d36afb4002faf0c6258d0199bd).
 * This patch **fails MiMa tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19788
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19788: [SPARK-9853][Core] Optimize shuffle fetch of contiguous ...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19788
  
**[Test build #84148 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84148/testReport)**
 for PR 19788 at commit 
[`e437a26`](https://github.com/apache/spark/commit/e437a269c184f4d36afb4002faf0c6258d0199bd).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152896879
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length.toLong * totalSizes.length / parallelAggThreshold 
+ 1).toInt
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+val threadPool = ThreadUtils.newDaemonFixedThreadPool(parallelism, 
"map-output-aggregate")
--- End diff --

The value of `parallelism` seems making us not fully utilize all processors 
at all time? E.g, if `availableProcessors` returns 8, but `parallelism` is 2, 
we pick 2 as number of threads.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152896399
  
--- Diff: 
core/src/main/scala/org/apache/spark/internal/config/package.scala ---
@@ -485,4 +485,13 @@ package object config {
 "array in the sorter.")
   .intConf
   .createWithDefault(Integer.MAX_VALUE)
+
+  private[spark] val SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD =
+ConfigBuilder("spark.shuffle.mapOutput.parallelAggregationThreshold")
+  .internal()
+  .doc("Multi-thread is used when the number of mappers * shuffle 
partitions is greater than " +
+"or equal to this threshold.")
--- End diff --

Is this condition to enable parallel aggregation still true?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-11-23 Thread maropu
Github user maropu commented on the issue:

https://github.com/apache/spark/pull/19518
  
I like the latest @kiszk hybrid idea in terms of performance and 
readability. Also, this is a corner case, so  I don't want affect most regular 
small queries.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsa...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19789#discussion_r152895550
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -211,8 +211,8 @@ private[spark] class KafkaRDD[K, V](
 var requestOffset = part.fromOffset
 
 def closeIfNeeded(): Unit = {
-  if (!useConsumerCache && consumer != null) {
-consumer.close
+  if (consumer != null) {
+  consumer.close()
--- End diff --

I think this should be double spaced


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19775: [SPARK-22343][core] Add support for publishing Spark met...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19775
  
Do we have to put this in Spark, is it a necessary part of k8s? I think if 
we pull in that PR(https://github.com/apache/spark/pull/11994), then this can 
be stayed out of Spark as a package. Even without #11994 , I believe users can 
still add their own Metrics source/sink via exposed SparkEnv/MetricsSystem. My 
concern is that this unnecessarily increases the code base of spark core.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84146/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19803
  
**[Test build #84146 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84146/testReport)**
 for PR 19803 at commit 
[`d674494`](https://github.com/apache/spark/commit/d67449428a107dc840bd801b820c0ec3d39f4241).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84145/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19803
  
**[Test build #84145 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84145/testReport)**
 for PR 19803 at commit 
[`7306d5d`](https://github.com/apache/spark/commit/7306d5d6a6fe88d3660b4197eea43f59d4124332).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19802: [WIP][SPARK-22594][CORE] Handling spark-submit and maste...

2017-11-23 Thread jerryshao
Github user jerryshao commented on the issue:

https://github.com/apache/spark/pull/19802
  
Can you please explain more, and how to reproduce this issue? Spark's RPC 
is not designed for version compatible.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891920
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

Also if length is "1", then this will always be Zero.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891792
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -812,10 +812,13 @@ private[spark] object MapOutputTracker extends 
Logging {
 logError(errorMessage)
 throw new MetadataFetchFailedException(shuffleId, startPartition, 
errorMessage)
   } else {
+var totalSize = 0L
 for (part <- startPartition until endPartition) {
-  splitsByAddress.getOrElseUpdate(status.location, ArrayBuffer()) 
+=
-((ShuffleBlockId(shuffleId, mapId, part), 
status.getSizeForBlock(part)))
+  totalSize += status.getSizeForBlock(part)
--- End diff --

This can be simplified like: `val totalSize = (startPartition until 
endPartition).map(status.getSizeForXXX).sum`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891172
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
--- End diff --

Not necessary to add this, I guess this is mainly for your debug purpose.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19788: [SPARK-9853][Core] Optimize shuffle fetch of cont...

2017-11-23 Thread jerryshao
Github user jerryshao commented on a diff in the pull request:

https://github.com/apache/spark/pull/19788#discussion_r152891438
  
--- Diff: 
core/src/main/scala/org/apache/spark/shuffle/IndexShuffleBlockResolver.scala ---
@@ -196,12 +196,14 @@ private[spark] class IndexShuffleBlockResolver(
   override def getBlockData(blockId: ShuffleBlockId): ManagedBuffer = {
 // The block is actually going to be a range of a single map output 
file for this map, so
 // find out the consolidated file, then the offset within that from 
our index
+logDebug(s"Fetch block data for $blockId")
 val indexFile = getIndexFile(blockId.shuffleId, blockId.mapId)
 
 val in = new DataInputStream(Files.newInputStream(indexFile.toPath))
 try {
   ByteStreams.skipFully(in, blockId.reduceId * 8)
   val offset = in.readLong()
+  ByteStreams.skipFully(in, (blockId.length - 1) * 8)
--- End diff --

I doubt this line is not correct, this seems change the semantics, for 
example if startPartition is 3, endPartition is 8, originally it should be 
(3\*8), now it changes to (4\*8), can you please explain more?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19808#discussion_r152891440
  
--- Diff: bin/find-spark-home.cmd ---
@@ -32,7 +32,7 @@ if not "x%PYSPARK_PYTHON%"=="x" (
 )
 
 rem If there is python installed, trying to use the root dir as SPARK_HOME
-where %PYTHON_RUNNER% > nul 2>$1
+where %PYTHON_RUNNER% > nul 2>&1
--- End diff --

This was a mistake. It should be `&` otherwise creates an empty file called 
`$1`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19808#discussion_r152891405
  
--- Diff: bin/spark-sql.cmd ---
@@ -0,0 +1,25 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+remhttp://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem This is the entry point for running SparkSQL. To avoid polluting the
+rem environment, it just launches a new cmd to do the real work.
+
+rem The outermost quotes are used to prevent Windows command line parse 
error
+rem when there are some quotes in parameters, see SPARK-21877.
+cmd /V /E /C ""%~dp0spark-sql2.cmd" %*"
--- End diff --

Separate script is required. Otherwise, it will actually set the 
environment variable, `SPARK_HOME`. See SPARK-3943 and I also manually tested.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19808
  
**[Test build #84147 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84147/testReport)**
 for PR 19808 at commit 
[`d1ddede`](https://github.com/apache/spark/commit/d1ddede1ac34deb3732670e7425ec0b84b6d0508).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19808: [SPARK-22597][SQL] Add spark-sql cmd script for Windows ...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19808
  
cc @cloud-fan, @felixcheung, @jsnowacki and @srowen who I could think are 
probably interested in this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19808: [SPARK-22597][SQL] Add spark-sql cmd script for W...

2017-11-23 Thread HyukjinKwon
GitHub user HyukjinKwon opened a pull request:

https://github.com/apache/spark/pull/19808

[SPARK-22597][SQL] Add spark-sql cmd script for Windows users

## What changes were proposed in this pull request?

This PR proposes to add cmd scripts so that Windows users can also run 
`spark-sql` script.

## How was this patch tested?

Manually tested on Windows.

```cmd
C:\...\spark>.\bin\spark-sql
'.\bin\spark-sql' is not recognized as an internal or external command,
operable program or batch file.

C:\...\spark>.\bin\spark-sql.cmd
'.\bin\spark-sql.cmd' is not recognized as an internal or external command,
operable program or batch file.
```


```cmd
C:\...\spark>.\bin\spark-sql
...
spark-sql> SELECT 'Hello World !!';
...
Hello World !!
Time taken: 4.022 seconds, Fetched 1 row(s)
```

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/HyukjinKwon/spark spark-sql-cmd

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/19808.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #19808


commit d1ddede1ac34deb3732670e7425ec0b84b6d0508
Author: hyukjinkwon 
Date:   2017-11-24T01:10:47Z

Add spark-sql script for Windows users




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #17819: [SPARK-20542][ML][SQL] Add an API to Bucketizer t...

2017-11-23 Thread tengpeng
Github user tengpeng commented on a diff in the pull request:

https://github.com/apache/spark/pull/17819#discussion_r152891005
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/feature/Bucketizer.scala 
---
@@ -108,26 +164,53 @@ final class Bucketizer @Since("1.4.0") 
(@Since("1.4.0") override val uid: String
   }
 }
 
-val bucketizer: UserDefinedFunction = udf { (feature: Double) =>
-  Bucketizer.binarySearchForBuckets($(splits), feature, keepInvalid)
-}.withName("bucketizer")
+val seqOfSplits = if (isBucketizeMultipleColumns()) {
+  $(splitsArray).toSeq
--- End diff --

I am interested in the motivation of using `.toSeq` and `Seq()` here


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19800
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84144/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19800
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19800
  
**[Test build #84144 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84144/testReport)**
 for PR 19800 at commit 
[`7462a55`](https://github.com/apache/spark/commit/7462a55a4256143c39fdaacfdaf334dfc4ba8b7d).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-11-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19518
  
@mgaido91 Thank you for your questions.
1. I am using `javac` as shown. I am sorry that I cannot understand what 
you are pointing out. In this benchmark, what are differences between `javac` 
and `janinoc`?
2. I agree with you.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-11-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19518
  
Based on performance results and usage of constant pool entry, I would like 
to use hybrid approach with flat global variable and array.  
For example, first 500 variables are stored into flat global variables, 
then others are stored into arrays with 32767 elements. I think that most of 
non-extreme cases can enjoy simple code without array accesses and good 
performance.

WDYT?

```
class Foo {
  int globalVars1;
  int globalVars2;
  ...
  int globalVars499;
  int globalVars500;
  int[] globalArrays1 = new int[32767];
  int[] globalArrays2 = new int[32767];
  int[] globalArrays3 = new int[32767];
  ...

  void apply1(InternalRow i) {
globalVars1 = 1;
globalVars2 = 1;
...
globalVars499 = 1;
globalVars500 = 1;
  }

  void apply2(InternalRow i) {
globalArrays1[0] = 1;
globalArrays1[1] = 1;
...
  }

  void apply(InternalRow i) {
apply0(i);
apply1(i);
apply2(i);
...
  }
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19518: [SPARK-18016][SQL][CATALYST] Code Generation: Constant P...

2017-11-23 Thread kiszk
Github user kiszk commented on the issue:

https://github.com/apache/spark/pull/19518
  
I created and ran another synthetic benchmark program for comparing flat 
global variables, inner global variables, and array.  In summary, the 
followings are performance results (**small number is better**).
- 0.65: flat global variables
- 0.91: inner global variables
- 1: array

WDYT? Any comments are very appreciated.

Here are 
[Test.java](https://gist.github.com/kiszk/63c2829488cb777d7ca78d45d20c021f) and 
[myInsntance.py](https://gist.github.com/kiszk/049a62f5d1259481c400a86299bd0228)
 that I used.

```
$ cat /proc/cpuinfo | grep "model name" | uniq
model name  : Intel(R) Xeon(R) CPU E5-2667 v3 @ 3.20GHz
$ java -version
openjdk version "1.8.0_131"
OpenJDK Runtime Environment (build 1.8.0_131-8u131-b11-2ubuntu1.16.04.3-b11)
OpenJDK 64-Bit Server VM (build 25.131-b11, mixed mode)
$ python myInstance.py > MyInstance.java && javac Test.java && java Test

Result(us): Array
   0: 462848.969
   1: 461978.693
   2: 463174.459
   3: 461422.763
   4: 460563.915
   5: 460112.262
   6: 460059.957
   7: 460376.230
   8: 460245.445
   9: 460308.775
  10: 460154.955
  11: 460005.629
  12: 460330.584
  13: 460277.612
  14: 460181.360
  15: 460168.843
  16: 459790.137
  17: 460248.481
  18: 460344.471
  19: 460084.529
  20: 459987.263
  21: 459961.639
  22: 459952.447
  23: 460128.518
  24: 460025.783
  25: 459874.303
  26: 459932.685
  27: 460065.736
  28: 459954.526
  29: 459972.679
BEST: 459790.137000, AVG: 460417.788

Result(us): InnerVars
   0: 421013.480
   1: 420279.235
   2: 419366.157
   3: 421015.934
   4: 419540.049
   5: 420316.650
   6: 419816.612
   7: 420211.140
   8: 420215.864
   9: 421104.657
  10: 421836.430
  11: 420866.894
  12: 421457.850
  13: 421734.506
  14: 420796.010
  15: 419832.910
  16: 420012.167
  17: 420821.800
  18: 420962.178
  19: 421981.676
  20: 421721.257
  21: 419996.594
  22: 419742.884
  23: 420158.066
  24: 420156.773
  25: 420325.231
  26: 420966.914
  27: 420787.147
  28: 420296.789
  29: 420520.843
BEST: 419366.157, AVG: 420595.157

Result(us): Vars
   0: 343490.797
   1: 342849.079
   2: 341990.967
   3: 342844.044
   4: 343484.681
   5: 342586.419
   6: 342468.883
   7: 343113.300
   8: 343516.875
   9: 343002.395
  10: 341499.538
  11: 342192.102
  12: 341847.383
  13: 342533.215
  14: 341376.556
  15: 342018.111
  16: 341316.445
  17: 342043.378
  18: 341969.932
  19: 343415.854
  20: 343103.133
  21: 342084.686
  22: 341555.293
  23: 342984.355
  24: 342302.336
  25: 341994.372
  26: 342475.639
  27: 342281.214
  28: 342205.175
  29: 342462.032
BEST: 341316.445, AVG: 342433.606
```



---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/19807#discussion_r152889444
  
--- Diff: bin/find-spark-home.cmd ---
@@ -0,0 +1,60 @@
+@echo off
+
+rem
+rem Licensed to the Apache Software Foundation (ASF) under one or more
+rem contributor license agreements.  See the NOTICE file distributed with
+rem this work for additional information regarding copyright ownership.
+rem The ASF licenses this file to You under the Apache License, Version 2.0
+rem (the "License"); you may not use this file except in compliance with
+rem the License.  You may obtain a copy of the License at
+rem
+remhttp://www.apache.org/licenses/LICENSE-2.0
+rem
+rem Unless required by applicable law or agreed to in writing, software
+rem distributed under the License is distributed on an "AS IS" BASIS,
+rem WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or 
implied.
+rem See the License for the specific language governing permissions and
+rem limitations under the License.
+rem
+
+rem Path to Python script finding SPARK_HOME
+set FIND_SPARK_HOME_PYTHON_SCRIPT=%~dp0find_spark_home.py
+
+rem Default to standard python interpreter unless told otherwise
+set PYTHON_RUNNER=python
+rem If PYSPARK_DRIVER_PYTHON is set, it overwrites the python version
+if not "x%PYSPARK_DRIVER_PYTHON%"=="x" (
+  set PYTHON_RUNNER=%PYSPARK_DRIVER_PYTHON%
+)
+rem If PYSPARK_PYTHON is set, it overwrites the python version
+if not "x%PYSPARK_PYTHON%"=="x" (
+  set PYTHON_RUNNER=%PYSPARK_PYTHON%
+)
+
+rem If there is python installed, trying to use the root dir as SPARK_HOME
+where %PYTHON_RUNNER% > nul 2>$1
--- End diff --

There seems a typo here actually !!  `where %PYTHON_RUNNER% > nul 2>$1` -> 
`where %PYTHON_RUNNER% > nul 2>&1`.

Will fix it up in master by myself soon.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888380
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

@cloud-fan `We can shut down the pool after some certain idle time, but not 
sure if it's worth the complexity` I know we don't need to do this now. But if 
we did it how to do?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread gczsjdy
Github user gczsjdy commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152888257
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

My fault!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPAR...

2017-11-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19806#discussion_r152886393
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 ---
@@ -829,7 +829,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("SPARK-22500: cast for struct should not generate codes beyond 
64KB") {
-val N = 250
+val N = 25
--- End diff --

Yes, I confirmed that 25 can reproduce the bug of SPARK-22500 by reverting 
the change of #19730. For example, 20 cannot reproduce the bug.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19803
  
**[Test build #84146 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84146/testReport)**
 for PR 19803 at commit 
[`d674494`](https://github.com/apache/spark/commit/d67449428a107dc840bd801b820c0ec3d39f4241).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19803
  
**[Test build #84145 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84145/testReport)**
 for PR 19803 at commit 
[`7306d5d`](https://github.com/apache/spark/commit/7306d5d6a6fe88d3660b4197eea43f59d4124332).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152881282
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 ---
@@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: 
DataType, nullable: Boolean)
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val javaType = ctx.javaType(dataType)
-val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString)
 if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) {
   val oev = ctx.currentVars(ordinal)
   ev.isNull = oev.isNull
   ev.value = oev.value
-  val code = oev.code
-  oev.code = ""
-  ev.copy(code = code)
-} else if (nullable) {
-  ev.copy(code = s"""
-boolean ${ev.isNull} = ${ctx.INPUT_ROW}.isNullAt($ordinal);
-$javaType ${ev.value} = ${ev.isNull} ? 
${ctx.defaultValue(dataType)} : ($value);""")
+  ev.copy(code = oev.code)
 } else {
-  ev.copy(code = s"""$javaType ${ev.value} = $value;""", isNull = 
"false")
+  assert(ctx.INPUT_ROW != null)
--- End diff --

Add an assert message. 
`assert(ctx.INPUT_ROW != null, "INPUT_ROW and currentVars cannot both be 
null.")`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152881786
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 ---
@@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: 
DataType, nullable: Boolean)
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val javaType = ctx.javaType(dataType)
--- End diff --

Also move this line to the else block. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152883588
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -355,19 +355,12 @@ case class FileSourceScanExec(
 // PhysicalRDD always just has one input
 val input = ctx.freshName("input")
 ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-val exprRows = output.zipWithIndex.map{ case (a, i) =>
-  BoundReference(i, a.dataType, a.nullable)
-}
 val row = ctx.freshName("row")
-ctx.INPUT_ROW = row
-ctx.currentVars = null
-val columnsRowInput = exprRows.map(_.genCode(ctx))
-val inputRow = if (needsUnsafeRowConversion) null else row
--- End diff --

good catch!


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152883562
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -56,9 +56,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val exprs = projectList.map(x =>
-  ExpressionCanonicalizer.execute(BindReferences.bindReference(x, 
child.output)))
-ctx.currentVars = input
+val exprs = projectList.map(x => 
BindReferences.bindReference[Expression](x, child.output))
--- End diff --

it doesn't matter, `Alias.genCode` is just a pass-through.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152882655
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/DataSourceScanExec.scala 
---
@@ -355,19 +355,12 @@ case class FileSourceScanExec(
 // PhysicalRDD always just has one input
 val input = ctx.freshName("input")
 ctx.addMutableState("scala.collection.Iterator", input, s"$input = 
inputs[0];")
-val exprRows = output.zipWithIndex.map{ case (a, i) =>
-  BoundReference(i, a.dataType, a.nullable)
-}
 val row = ctx.freshName("row")
-ctx.INPUT_ROW = row
-ctx.currentVars = null
-val columnsRowInput = exprRows.map(_.genCode(ctx))
-val inputRow = if (needsUnsafeRowConversion) null else row
--- End diff --

When `needsUnsafeRowConversion` is true, previously we pass in a null and 
`consume` will generate code for a unsafe row projection. Now we always pass in 
the `row` which can possibly not a unsafe row. This looks changing behavior?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/19803
  
Two comments, LGTM otherwise.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152882876
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/basicPhysicalOperators.scala
 ---
@@ -56,9 +56,7 @@ case class ProjectExec(projectList: Seq[NamedExpression], 
child: SparkPlan)
   }
 
   override def doConsume(ctx: CodegenContext, input: Seq[ExprCode], row: 
ExprCode): String = {
-val exprs = projectList.map(x =>
-  ExpressionCanonicalizer.execute(BindReferences.bindReference(x, 
child.output)))
-ctx.currentVars = input
+val exprs = projectList.map(x => 
BindReferences.bindReference[Expression](x, child.output))
--- End diff --

This gets rid of `ExpressionCanonicalizer`. Isn't it possibly we have 
`Alias` in `projectList` here?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19807
  
LGTM pending AppVeyor tests.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19807
  
Build started: [SparkR] `ALL` 
[![PR-19807](https://ci.appveyor.com/api/projects/status/github/spark-test/spark?branch=4A7B521C-83BF-4F47-84AF-94D49BCBF40E=true)](https://ci.appveyor.com/project/spark-test/spark/branch/4A7B521C-83BF-4F47-84AF-94D49BCBF40E)
Diff: 
https://github.com/apache/spark/compare/branch-2.2...spark-test:4A7B521C-83BF-4F47-84AF-94D49BCBF40E


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't change Cod...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19800
  
**[Test build #84144 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84144/testReport)**
 for PR 19800 at commit 
[`7462a55`](https://github.com/apache/spark/commit/7462a55a4256143c39fdaacfdaf334dfc4ba8b7d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19803: [SPARK-22596][SQL] set ctx.currentVars in Codegen...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19803#discussion_r152881704
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/BoundAttribute.scala
 ---
@@ -60,20 +60,23 @@ case class BoundReference(ordinal: Int, dataType: 
DataType, nullable: Boolean)
 
   override def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
 val javaType = ctx.javaType(dataType)
-val value = ctx.getValue(ctx.INPUT_ROW, dataType, ordinal.toString)
 if (ctx.currentVars != null && ctx.currentVars(ordinal) != null) {
   val oev = ctx.currentVars(ordinal)
   ev.isNull = oev.isNull
   ev.value = oev.value
-  val code = oev.code
-  oev.code = ""
-  ev.copy(code = code)
--- End diff --

Great, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19800: [SPARK-22591][SQL] GenerateOrdering shouldn't cha...

2017-11-23 Thread viirya
Github user viirya commented on a diff in the pull request:

https://github.com/apache/spark/pull/19800#discussion_r152881639
  
--- Diff: 
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateOrdering.scala
 ---
@@ -149,10 +151,14 @@ object GenerateOrdering extends 
CodeGenerator[Seq[SortOrder], Ordering[InternalR
   })
 // make sure INPUT_ROW is declared even if splitExpressions
 // returns an inlined block
-s"""
+val finalCode = s"""
|InternalRow ${ctx.INPUT_ROW} = null;
|$code
  """.stripMargin
+// Restore original currentVars and INPUT_ROW.
+ctx.currentVars = oldCurrentVars
+ctx.INPUT_ROW = oldInputRow
+finalCode
--- End diff --

Yes, thanks.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...

2017-11-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/19801


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19801: [SPARK-22592][SQL] cleanup filter converting for hive

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19801
  
Thanks! Merged to master.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19801: [SPARK-22592][SQL] cleanup filter converting for hive

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/19801
  
LGTM


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19801#discussion_r152880052
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -660,40 +626,68 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
   }
 
   def unapply(values: Set[Any]): Option[Seq[String]] = {
-values.toSeq.foldLeft(Option(Seq.empty[String])) {
-  case (Some(accum), value) if 
valueToLiteralString.isDefinedAt(value) =>
-Some(accum :+ valueToLiteralString(value))
-  case _ => None
+val extractables = values.toSeq.map(valueToLiteralString.lift)
+if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
--- End diff --

The same here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19801: [SPARK-22592][SQL] cleanup filter converting for ...

2017-11-23 Thread gatorsmile
Github user gatorsmile commented on a diff in the pull request:

https://github.com/apache/spark/pull/19801#discussion_r152880027
  
--- Diff: 
sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveShim.scala ---
@@ -643,9 +607,11 @@ private[client] class Shim_v0_13 extends Shim_v0_12 {
 
 object ExtractableLiterals {
   def unapply(exprs: Seq[Expression]): Option[Seq[String]] = {
-
exprs.map(ExtractableLiteral.unapply).foldLeft(Option(Seq.empty[String])) {
-  case (Some(accum), Some(value)) => Some(accum :+ value)
-  case _ => None
+val extractables = exprs.map(ExtractableLiteral.unapply)
+if (extractables.nonEmpty && extractables.forall(_.isDefined)) {
--- End diff --

-> `if (extractables.exists(_.isDefined))`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPAR...

2017-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19806#discussion_r152879501
  
--- Diff: 
sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/CastSuite.scala
 ---
@@ -829,7 +829,7 @@ class CastSuite extends SparkFunSuite with 
ExpressionEvalHelper {
   }
 
   test("SPARK-22500: cast for struct should not generate codes beyond 
64KB") {
-val N = 250
+val N = 25
--- End diff --

is 25 big enough to reproduce the bug of SPARK-22595?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19789
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19789
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84143/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19789
  
**[Test build #84143 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84143/testReport)**
 for PR 19789 at commit 
[`c2c3ed9`](https://github.com/apache/spark/commit/c2c3ed9e2e672b3b9205a824b8cdd2f9ca2131eb).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds the following public classes _(experimental)_:
  * `class CachedKafkaConsumerSuite extends SparkFunSuite with 
BeforeAndAfterAll `


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19789
  
**[Test build #84143 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84143/testReport)**
 for PR 19789 at commit 
[`c2c3ed9`](https://github.com/apache/spark/commit/c2c3ed9e2e672b3b9205a824b8cdd2f9ca2131eb).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19807
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19807
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84142/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19807: [SPARK-22495] Fix setup of SPARK_HOME variable on Window...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19807
  
**[Test build #84142 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84142/testReport)**
 for PR 19807 at commit 
[`bd24e47`](https://github.com/apache/spark/commit/bd24e470227437a52b07d95335579f1afcdda905).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19789: [SPARK-22562][Streaming] CachedKafkaConsumer unsafe evic...

2017-11-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/19789
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19806
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19806
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84141/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19806: [SPARK-22595][SQL] fix flaky test: CastSuite.SPARK-22500...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19806
  
**[Test build #84141 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84141/testReport)**
 for PR 19806 at commit 
[`48103dd`](https://github.com/apache/spark/commit/48103dd26817629cc00619ea8659d14533f53fb5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19763: [SPARK-22537][core] Aggregation of map output sta...

2017-11-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19763#discussion_r152870781
  
--- Diff: core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---
@@ -472,15 +475,66 @@ private[spark] class MapOutputTrackerMaster(
 shuffleStatuses.get(shuffleId).map(_.findMissingPartitions())
   }
 
+  /**
+   * Grouped function of Range, this is to avoid traverse of all elements 
of Range using
+   * IterableLike's grouped function.
+   */
+  def rangeGrouped(range: Range, size: Int): Seq[Range] = {
+val start = range.start
+val step = range.step
+val end = range.end
+for (i <- start.until(end, size * step)) yield {
+  i.until(i + size * step, step)
+}
+  }
+
+  /**
+   * To equally divide n elements into m buckets, basically each bucket 
should have n/m elements,
+   * for the remaining n%m elements, add one more element to the first n%m 
buckets each.
+   */
+  def equallyDivide(numElements: Int, numBuckets: Int): Seq[Seq[Int]] = {
+val elementsPerBucket = numElements / numBuckets
+val remaining = numElements % numBuckets
+val splitPoint = (elementsPerBucket + 1) * remaining
+if (elementsPerBucket == 0) {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1)
+} else {
+  rangeGrouped(0.until(splitPoint), elementsPerBucket + 1) ++
+rangeGrouped(splitPoint.until(numElements), elementsPerBucket)
+}
+  }
+
   /**
* Return statistics about all of the outputs for a given shuffle.
*/
   def getStatistics(dep: ShuffleDependency[_, _, _]): MapOutputStatistics 
= {
 shuffleStatuses(dep.shuffleId).withMapStatuses { statuses =>
   val totalSizes = new Array[Long](dep.partitioner.numPartitions)
-  for (s <- statuses) {
-for (i <- 0 until totalSizes.length) {
-  totalSizes(i) += s.getSizeForBlock(i)
+  val parallelAggThreshold = conf.get(
+SHUFFLE_MAP_OUTPUT_PARALLEL_AGGREGATION_THRESHOLD)
+  val parallelism = math.min(
+Runtime.getRuntime.availableProcessors(),
+statuses.length * totalSizes.length / parallelAggThreshold + 1)
+  if (parallelism <= 1) {
+for (s <- statuses) {
+  for (i <- 0 until totalSizes.length) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  } else {
+try {
+  val threadPool = 
ThreadUtils.newDaemonFixedThreadPool(parallelism, "map-output-aggregate")
+  implicit val executionContext = 
ExecutionContext.fromExecutor(threadPool)
+  val mapStatusSubmitTasks = equallyDivide(totalSizes.length, 
parallelism).map {
+reduceIds => Future {
+  for (s <- statuses; i <- reduceIds) {
+totalSizes(i) += s.getSizeForBlock(i)
+  }
+}
+  }
+  ThreadUtils.awaitResult(Future.sequence(mapStatusSubmitTasks), 
Duration.Inf)
+} finally {
+  threadpool.shutdown()
--- End diff --

ah good catch! I misread it...


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19763
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19763
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84134/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19763: [SPARK-22537][core] Aggregation of map output statistics...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19763
  
**[Test build #84134 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84134/testReport)**
 for PR 19763 at commit 
[`72c3d97`](https://github.com/apache/spark/commit/72c3d97e6e2f2c50504c5e4d8b80ea595797b044).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84137/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19803
  
**[Test build #84137 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/84137/testReport)**
 for PR 19803 at commit 
[`6758a34`](https://github.com/apache/spark/commit/6758a34aa6bd528a8fa8ab7d5db8964d1121f848).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19803: [SPARK-22596][SQL] set ctx.currentVars in CodegenSupport...

2017-11-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19803
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/84138/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   >