[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...

2018-02-23 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20648
  
@HyukjinKwon @cloud-fan Thanks for the comment! Yes, I agreed we need to 
keep the CSV's behavior. I will check how much we can clean up with it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20622
  
**[Test build #87636 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87636/testReport)**
 for PR 20622 at commit 
[`d404baf`](https://github.com/apache/spark/commit/d404bafa869a4950d607200586c88ea17d99f9f5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20622: [SPARK-23491][SS] Remove explicit job cancellatio...

2018-02-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20622#discussion_r170389827
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -266,6 +264,12 @@ class ContinuousExecution(
 SQLExecution.withNewExecutionId(
   sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
   }
+} catch {
+  case t: Throwable
+  if StreamExecution.isInterruptionException(t) && state.get() == 
RECONFIGURING =>
+stopSources()
+sparkSession.sparkContext.cancelJobGroup(runId.toString)
--- End diff --

nit: I think its cleaner to put this in the `finally` since this is the 
invariant we want when this entire method terminates.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20622
  
**[Test build #87637 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87637/testReport)**
 for PR 20622 at commit 
[`d3b16c1`](https://github.com/apache/spark/commit/d3b16c11671ba6514360121556ed5554f8bcf890).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20653: [SPARK-23459][SQL] Improve the error message when unknow...

2018-02-23 Thread gatorsmile
Github user gatorsmile commented on the issue:

https://github.com/apache/spark/pull/20653
  
LGTM

Thanks! Merged to master


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20604
  
**[Test build #87635 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport)**
 for PR 20604 at commit 
[`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1022/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...

2018-02-23 Thread squito
Github user squito commented on a diff in the pull request:

https://github.com/apache/spark/pull/20604#discussion_r170383918
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient {
   /**
* Request that the cluster manager kill the specified executors.
*
-   * When asking the executor to be replaced, the executor loss is 
considered a failure, and
-   * killed tasks that are running on the executor will count towards the 
failure limits. If no
-   * replacement is being requested, then the tasks will not count towards 
the limit.
-   *
* @param executorIds identifiers of executors to kill
-   * @param replace whether to replace the killed executors with new ones, 
default false
+   * @param adjustTargetNumExecutors whether the target number of 
executors will be adjusted down
+   * after these executors have been killed
+   * @param countFailures if there are tasks running on the executors when 
they are killed, whether
--- End diff --

whoops, I was supposed to set `countFailures = true` in 
`sc.killAndReplaceExecutors`, thanks for catching that.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread tdas
Github user tdas commented on the issue:

https://github.com/apache/spark/pull/20622
  
LGTM, assuming tests pass.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1023/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #87638 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87638/testReport)**
 for PR 19222 at commit 
[`95fbdee`](https://github.com/apache/spark/commit/95fbdee04e9137938cdc76f7f4573116720357f5).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20622: [SPARK-23491][SS] Remove explicit job cancellatio...

2018-02-23 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20622#discussion_r170392408
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 ---
@@ -266,6 +264,12 @@ class ContinuousExecution(
 SQLExecution.withNewExecutionId(
   sparkSessionForQuery, lastExecution)(lastExecution.toRdd)
   }
+} catch {
+  case t: Throwable
+  if StreamExecution.isInterruptionException(t) && state.get() == 
RECONFIGURING =>
+stopSources()
+sparkSession.sparkContext.cancelJobGroup(runId.toString)
--- End diff --

So we only swallow the exception when we are reconfiguration (btw always 
add logging when swallowing exceptions to leave a trail for debugging), and 
`stopSources()` and `cancelJobGroups()` can be finally as we want that as 
invariant no matter what happens in this `runContinuous` method.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87635/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1024/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread rdblue
Github user rdblue commented on the issue:

https://github.com/apache/spark/pull/20647
  
Thanks for removing the equality methods. This changes equality for the 
scan and streaming relation, though. Are those significant changes?

I still think this should not be committed until the style-only changes are 
rolled back. This is a significant source of headache for branch maintainers 
and contributors.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...

2018-02-23 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20648
  
>  Yup, +1 for starting this by disallowing but up to my knowledge R's 
read.csv allows then the legnth of tokens are shorter then its schema, putting 
nulls (or NA) into missing fields, as a valid case.

@HyukjinKwon If the length of tokens are longer than its schema, R's 
read.csv seems not to have error. Is this behavior also we want?

Spark's CSV reader just drops extra tokens when under permissive mode.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20611: [SPARK-23425][SQL]When wild card is been used in load co...

2018-02-23 Thread sujith71955
Github user sujith71955 commented on the issue:

https://github.com/apache/spark/pull/20611
  
@gatorsmile  Seems to be a random failures, each time random set of test 
cases are failing. Please let me know for any suggestions


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170412046
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---
@@ -631,6 +651,160 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val expected = Map(0 -> 1.0 / 3.0, 2 -> 2.0 / 3.0)
 assert(mapToVec(map.toMap) ~== mapToVec(expected) relTol 0.01)
   }
+
+  test("[SPARK-3159] tree model redundancy - binary classification") {
+val numClasses = 2
+
+val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 4,
+  numClasses = numClasses, maxBins = 32)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+ root
+  left1   right1
+left2  right2
+
+  pred(left1)  = 0
+  pred(left2)  = 1
+  pred(right2) = 0
+ */
+assert(dt.rootNode.numDescendants === 4)
+assert(dt.rootNode.subtreeDepth === 2)
+
+assert(dt.rootNode.isInstanceOf[InternalNode])
+
+// left 1 prediction test
+assert(dt.rootNode.asInstanceOf[InternalNode].leftChild.prediction === 
0)
+
+val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild
+assert(right1.isInstanceOf[InternalNode])
+
+// left 2 prediction test
+assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 1)
+// right 2 prediction test
+assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 0)
+  }
+
+  test("[SPARK-3159] tree model redundancy - multiclass classification") {
+val numClasses = 4
+
+val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 4,
+  numClasses = numClasses, maxBins = 32)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+root
+left1 right1
+left2  right2  left3  right3
+
+  pred(left2)  = 0
+  pred(right2)  = 1
+  pred(left3) = 2
+  pred(right3) = 1
+ */
+assert(dt.rootNode.numDescendants === 6)
+assert(dt.rootNode.subtreeDepth === 2)
+
+assert(dt.rootNode.isInstanceOf[InternalNode])
+
+val left1 = dt.rootNode.asInstanceOf[InternalNode].leftChild
+val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild
+
+assert(left1.isInstanceOf[InternalNode])
+
+// left 2 prediction test
+assert(left1.asInstanceOf[InternalNode].leftChild.prediction === 0)
+// right 2 prediction test
+assert(left1.asInstanceOf[InternalNode].rightChild.prediction === 1)
+
+assert(right1.isInstanceOf[InternalNode])
+
+// left 3 prediction test
+assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 2)
+// right 3 prediction test
+assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 1)
+  }
+
+  test("[SPARK-3159] tree model redundancy - regression") {
+val numClasses = 2
+
+val strategy = new OldStrategy(algo = OldAlgo.Regression, impurity = 
Variance,
+  maxDepth = 3, maxBins = 10, numClasses = numClasses)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+root
+1 2
+   1_1 1_2 2_1 2_2
+  1_1_1   1_1_2   1_2_1   1_2_2   2_1_1   2_1_2
+
+  pred(1_1_1)  = 0.5
+  pred(1_1_2)  = 0.0
+  pred(1_2_1)  = 0.0
+  pred(1_2_2)  = 0.25
+  pred(2_1_1)  = 1.0
+  pred(2_1_2)  = 0.
+  pred(2_2)= 0.5
+ */
+
+assert(dt.rootNode.numDescendants === 12)
--- End diff --

Ok, trying to understand these tests. From what I can tell, you've written 
a data generator that generates random points, and, somewhat by chance, 
generates redundant tree nodes if the tree is not pruned. Your relying on the 
random seed to give you a tree which should have exactly 12 descendants after 
pruning. 

I think these may be overly complicated. IMO, we just need to test the 
situation that causes this by creating simple dataset that can improve the 
impurity by splitting, but which does not change the prediction. For example, 
for the Gini impurity you might have the following:

```scala
val data = Array(
  LabeledPoint(0.0, Vectors.dense(1.0)),
  LabeledPoint(0.0, Vectors.dense(1.0)),
  

[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410747
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---
@@ -402,20 +405,40 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   LabeledPoint(1.0, Vectors.dense(2.0)))
 val input = sc.parallelize(arr)
 
+val seed = 42
+val numTrees = 1
+
 // Must set maxBins s.t. the feature will be treated as an ordered 
categorical feature.
 val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 1,
   numClasses = 2, categoricalFeaturesInfo = Map(0 -> 3), maxBins = 3)
 
-val model = RandomForest.run(input, strategy, numTrees = 1, 
featureSubsetStrategy = "all",
-  seed = 42, instr = None).head
-model.rootNode match {
-  case n: InternalNode => n.split match {
-case s: CategoricalSplit =>
-  assert(s.leftCategories === Array(1.0))
-case _ => throw new AssertionError("model.rootNode.split was not a 
CategoricalSplit")
-  }
-  case _ => throw new AssertionError("model.rootNode was not an 
InternalNode")
-}
+val metadata = DecisionTreeMetadata.buildMetadata(input, strategy, 
numTrees = numTrees,
+  featureSubsetStrategy = "all")
+val splits = RandomForest.findSplits(input, metadata, seed = seed)
+
+val treeInput = TreePoint.convertToTreeRDD(input, splits, metadata)
+val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput,
+  strategy.subsamplingRate, numTrees, false, seed = seed)
+
+val topNode = LearningNode.emptyNode(nodeIndex = 1)
+assert(topNode.isLeaf === false)
+assert(topNode.stats === null)
+
+val nodesForGroup = Map(0 -> Array(topNode))
+val treeToNodeToIndexInfo = Map(0 -> Map(
+  topNode.id -> new RandomForest.NodeIndexInfo(0, None)
+))
+val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+val bestSplit = RandomForest.findBestSplits(baggedInput, metadata, 
Map(0 -> topNode),
+  nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack)
+
+assert(topNode.split.isDefined, "rootNode does not have a split")
--- End diff --

I'm a fan of just calling `foreach` like:

```scala
topNode.split.foreach { split =>
assert(split.isInstanceOf[CategoricalSplit])
assert(split.toOld.categories === Array(1.0))
}
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410687
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---
@@ -402,20 +407,35 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   LabeledPoint(1.0, Vectors.dense(2.0)))
 val input = sc.parallelize(arr)
 
+val numTrees = 1
+
 // Must set maxBins s.t. the feature will be treated as an ordered 
categorical feature.
 val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 1,
   numClasses = 2, categoricalFeaturesInfo = Map(0 -> 3), maxBins = 3)
 
-val model = RandomForest.run(input, strategy, numTrees = 1, 
featureSubsetStrategy = "all",
-  seed = 42, instr = None).head
-model.rootNode match {
-  case n: InternalNode => n.split match {
-case s: CategoricalSplit =>
-  assert(s.leftCategories === Array(1.0))
-case _ => throw new AssertionError("model.rootNode.split was not a 
CategoricalSplit")
-  }
-  case _ => throw new AssertionError("model.rootNode was not an 
InternalNode")
-}
+val metadata = DecisionTreeMetadata.buildMetadata(input, strategy, 
numTrees = numTrees,
+  featureSubsetStrategy = "all")
+val splits = RandomForest.findSplits(input, metadata, seed = seed)
+
+val treeInput = TreePoint.convertToTreeRDD(input, splits, metadata)
+val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput,
+  strategy.subsamplingRate, numTrees, false, seed = seed)
+
+val topNode = LearningNode.emptyNode(nodeIndex = 1)
+val nodesForGroup = Map(0 -> Array(topNode))
+val treeToNodeToIndexInfo = Map(0 -> Map(
+  topNode.id -> new RandomForest.NodeIndexInfo(0, None)
+))
+val nodeStack = new mutable.ArrayStack[(Int, LearningNode)]
+val bestSplit = RandomForest.findBestSplits(baggedInput, metadata, 
Map(0 -> topNode),
--- End diff --

This method returns unit. Doesn't make sense to assign its output.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410905
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala ---
@@ -541,7 +541,7 @@ object DecisionTreeSuite extends SparkFunSuite {
 Array[LabeledPoint] = {
 val arr = new Array[LabeledPoint](3000)
 for (i <- 0 until 3000) {
-  if (i < 1000) {
+  if (i < 1001) {
--- End diff --

this is the type of thing that will puzzle someone down the line. I'm ok 
with it, though. :stuck_out_tongue_closed_eyes:


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20604
  
**[Test build #87635 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport)**
 for PR 20604 at commit 
[`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/20604
  
retest this please


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20662: [SPARK-23475][UI][BACKPORT-2.3] Show also skipped stages

2018-02-23 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/20662
  
Merging to 2.3. Please close the PR manually.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20622
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87636/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20622
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20622
  
**[Test build #87636 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87636/testReport)**
 for PR 20622 at commit 
[`d404baf`](https://github.com/apache/spark/commit/d404bafa869a4950d607200586c88ea17d99f9f5).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20622
  
**[Test build #87637 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87637/testReport)**
 for PR 20622 at commit 
[`d3b16c1`](https://github.com/apache/spark/commit/d3b16c11671ba6514360121556ed5554f8bcf890).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #87638 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87638/testReport)**
 for PR 19222 at commit 
[`95fbdee`](https://github.com/apache/spark/commit/95fbdee04e9137938cdc76f7f4573116720357f5).
 * This patch **fails Spark unit tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...

2018-02-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20648
  
_To me_ I have been roughly thinking that we should better match it to R's 
read.csv and explicitly document this. I believe this is a good reference our 
CSV has resembled so far.

BTW, I don't mind doing this separately as whatever you think is right.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...

2018-02-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20648
  
I think at least we should update the document for this behavior of csv 
reader.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410775
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -283,10 +292,12 @@ private[tree] class LearningNode(
 // Here we want to keep same behavior with the old 
mllib.DecisionTreeModel
 new LeafNode(stats.impurityCalculator.predict, -1.0, 
stats.impurityCalculator)
   }
-
 }
   }
 
+  /** @return true iff a node is a leaf. */
+  private def isLeafNode(): Boolean = leftChild.isEmpty && 
rightChild.isEmpty
--- End diff --

I wouldn't mind just removing this change. What constitutes a leaf node is 
now fuzzy, and if you just inline it the one place it's used there is no 
confusion. At any rate, you don't need the parentheses after method name.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170412098
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---
@@ -631,6 +651,160 @@ class RandomForestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 val expected = Map(0 -> 1.0 / 3.0, 2 -> 2.0 / 3.0)
 assert(mapToVec(map.toMap) ~== mapToVec(expected) relTol 0.01)
   }
+
+  test("[SPARK-3159] tree model redundancy - binary classification") {
+val numClasses = 2
+
+val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 4,
+  numClasses = numClasses, maxBins = 32)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+ root
+  left1   right1
+left2  right2
+
+  pred(left1)  = 0
+  pred(left2)  = 1
+  pred(right2) = 0
+ */
+assert(dt.rootNode.numDescendants === 4)
+assert(dt.rootNode.subtreeDepth === 2)
+
+assert(dt.rootNode.isInstanceOf[InternalNode])
+
+// left 1 prediction test
+assert(dt.rootNode.asInstanceOf[InternalNode].leftChild.prediction === 
0)
+
+val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild
+assert(right1.isInstanceOf[InternalNode])
+
+// left 2 prediction test
+assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 1)
+// right 2 prediction test
+assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 0)
+  }
+
+  test("[SPARK-3159] tree model redundancy - multiclass classification") {
+val numClasses = 4
+
+val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity 
= Gini, maxDepth = 4,
+  numClasses = numClasses, maxBins = 32)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+root
+left1 right1
+left2  right2  left3  right3
+
+  pred(left2)  = 0
+  pred(right2)  = 1
+  pred(left3) = 2
+  pred(right3) = 1
+ */
+assert(dt.rootNode.numDescendants === 6)
+assert(dt.rootNode.subtreeDepth === 2)
+
+assert(dt.rootNode.isInstanceOf[InternalNode])
+
+val left1 = dt.rootNode.asInstanceOf[InternalNode].leftChild
+val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild
+
+assert(left1.isInstanceOf[InternalNode])
+
+// left 2 prediction test
+assert(left1.asInstanceOf[InternalNode].leftChild.prediction === 0)
+// right 2 prediction test
+assert(left1.asInstanceOf[InternalNode].rightChild.prediction === 1)
+
+assert(right1.isInstanceOf[InternalNode])
+
+// left 3 prediction test
+assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 2)
+// right 3 prediction test
+assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 1)
+  }
+
+  test("[SPARK-3159] tree model redundancy - regression") {
+val numClasses = 2
+
+val strategy = new OldStrategy(algo = OldAlgo.Regression, impurity = 
Variance,
+  maxDepth = 3, maxBins = 10, numClasses = numClasses)
+
+val dt = buildRedundantDecisionTree(numClasses, 20, strategy = 
strategy)
+
+/* Expected tree structure tested below:
+root
+1 2
+   1_1 1_2 2_1 2_2
+  1_1_1   1_1_2   1_2_1   1_2_2   2_1_1   2_1_2
+
+  pred(1_1_1)  = 0.5
+  pred(1_1_2)  = 0.0
+  pred(1_2_1)  = 0.0
+  pred(1_2_2)  = 0.25
+  pred(2_1_1)  = 1.0
+  pred(2_1_2)  = 0.
+  pred(2_2)= 0.5
+ */
+
+assert(dt.rootNode.numDescendants === 12)
--- End diff --

The tree tests are already so long and complicated that I think it's 
important to simplify where possible. These tests are useful as they are, but 
it probably won't be obvious why/how they work to future devs. Also, if we can 
avoid adding data generation code, that would be nice (there's already tons of 
code like that laying around the test suites). 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410834
  
--- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala ---
@@ -270,11 +269,21 @@ private[tree] class LearningNode(
* Convert this [[LearningNode]] to a regular [[Node]], and recurse on 
any children.
*/
   def toNode: Node = {
-if (leftChild.nonEmpty) {
-  assert(rightChild.nonEmpty && split.nonEmpty && stats != null,
+
+// convert to an inner node only when:
+//  -) the node is not a leaf, and
+//  -) the subtree rooted at this node cannot be replaced by a single 
leaf
+// (i.e., there at least two different leaf predictions appear in 
the subtree)
--- End diff --

This comment seems out of place now. You might just say `// when both 
children make the same prediction, collapse into single leaf` or something 
similar below the first case statement.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...

2018-02-23 Thread sethah
Github user sethah commented on a diff in the pull request:

https://github.com/apache/spark/pull/20632#discussion_r170410851
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala ---
@@ -18,17 +18,20 @@
 package org.apache.spark.ml.tree.impl
 
 import scala.collection.mutable
+import scala.util.Random
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkContext, SparkFunSuite}
 import org.apache.spark.ml.classification.DecisionTreeClassificationModel
 import org.apache.spark.ml.feature.LabeledPoint
 import org.apache.spark.ml.linalg.{Vector, Vectors}
 import org.apache.spark.ml.tree._
 import org.apache.spark.ml.util.TestingUtils._
 import org.apache.spark.mllib.tree.{DecisionTreeSuite => OldDTSuite, 
EnsembleTestHelper}
 import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, 
QuantileStrategy, Strategy => OldStrategy}
+import org.apache.spark.mllib.tree.configuration.FeatureType._
--- End diff --

unused


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1025/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread cloud-fan
Github user cloud-fan commented on the issue:

https://github.com/apache/spark/pull/20647
  
>  This changes equality for the scan and streaming relation, though.

If we think this is the right equality for `DataSourceV2Relation`, it 
should also be the right equality for scan and streaming relation. They should 
be consistent.

I've rolled back unnecessary style-only changes, but leave the one that 
clean up unused imports. I think this should be encouraged, it's very hard to 
find out unused imports during code review, people should find and fix them 
while touching files in the PR, some IDE can even do it automatically when you 
save the file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20647
  
**[Test build #87640 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87640/testReport)**
 for PR 20647 at commit 
[`a73370a`](https://github.com/apache/spark/commit/a73370a5bf56f45ce67cd6cdaf86b53a14a67b5b).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...

2018-02-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20663#discussion_r170408040
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
---
@@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) 
extends WebUIPage("") {
   Seq.empty[Node]
 }
   }
-if (shouldShowActiveStages) {
-  content ++=
-
-  
-
-Active Stages ({activeStages.size})
-  
- ++
-
-  {activeStagesTable.toNodeSeq}
-
-}
-if (shouldShowPendingStages) {
-  content ++=
-
-  
-
-Pending Stages ({pendingStages.size})
-  
- ++
-
-  {pendingStagesTable.toNodeSeq}
-
+
+tables.flatten.foreach(content ++= _)
--- End diff --

`content ++= tables.flatten`?

But I think this would be better as:

```
val summary = blah
val pools = if (sc.isDefined && isFairScheduler) ... else ...
val stages = tables.flatten

val content = summary ++ pools ++ stages
```


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...

2018-02-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20663#discussion_r170407867
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
---
@@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) 
extends WebUIPage("") {
   Seq.empty[Node]
 }
   }
-if (shouldShowActiveStages) {
-  content ++=
-
-  
-
-Active Stages ({activeStages.size})
-  
- ++
-
-  {activeStagesTable.toNodeSeq}
-
-}
-if (shouldShowPendingStages) {
-  content ++=
-
-  
-
-Pending Stages ({pendingStages.size})
-  
- ++
-
-  {pendingStagesTable.toNodeSeq}
-
+
+tables.flatten.foreach(content ++= _)
+
+UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
+  }
+
+  def summaryAndTableForStatus(
+  status: StageStatus,
+  request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = {
+val stages = if (status == StageStatus.FAILED) {
+  allStages.filter(_.status == status).reverse
+} else {
+  allStages.filter(_.status == status)
 }
-if (shouldShowCompletedStages) {
-  content ++=
-
-  
-
-Completed Stages ({completedStageNumStr})
-  
- ++
-
-  {completedStagesTable.toNodeSeq}
-
+
+if (stages.isEmpty) {
+  (None, None)
+} else {
+  val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled
+  val isFailedStage = status == StageStatus.FAILED
+
+  val stagesTable =
+new StageTableBase(parent.store, request, stages, 
tableHeaderID(status), stageTag(status),
+  parent.basePath, subPath, parent.isFairScheduler, killEnabled, 
isFailedStage)
+  val stagesSize = stages.size
+  (Some(summary(status, stagesSize)), Some(table(status, stagesTable, 
stagesSize)))
 }
-if (shouldShowSkippedStages) {
-  content ++=
-
-  
-
-Skipped Stages ({skippedStages.size})
-  
- ++
-
-  {skippedStagesTable.toNodeSeq}
-
+  }
+
+  private def tableHeaderID(status: StageStatus): String = status match {
--- End diff --

All these look very similar. Having a single one that does the mapping and 
have the others call that method would be nice.

e.g.

```
def stageTag(status: StageStatus) = s"${statusName(status)}Stage"
```

Then you could also get rid of `classSuffix`, for example, since it's only 
really called in one place, and the new implementation would be much simpler.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...

2018-02-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20663#discussion_r170407936
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
---
@@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) 
extends WebUIPage("") {
   Seq.empty[Node]
 }
   }
-if (shouldShowActiveStages) {
-  content ++=
-
-  
-
-Active Stages ({activeStages.size})
-  
- ++
-
-  {activeStagesTable.toNodeSeq}
-
-}
-if (shouldShowPendingStages) {
-  content ++=
-
-  
-
-Pending Stages ({pendingStages.size})
-  
- ++
-
-  {pendingStagesTable.toNodeSeq}
-
+
+tables.flatten.foreach(content ++= _)
+
+UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
+  }
+
+  def summaryAndTableForStatus(
+  status: StageStatus,
+  request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = {
+val stages = if (status == StageStatus.FAILED) {
+  allStages.filter(_.status == status).reverse
+} else {
+  allStages.filter(_.status == status)
 }
-if (shouldShowCompletedStages) {
-  content ++=
-
-  
-
-Completed Stages ({completedStageNumStr})
-  
- ++
-
-  {completedStagesTable.toNodeSeq}
-
+
+if (stages.isEmpty) {
+  (None, None)
+} else {
+  val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled
+  val isFailedStage = status == StageStatus.FAILED
+
+  val stagesTable =
+new StageTableBase(parent.store, request, stages, 
tableHeaderID(status), stageTag(status),
+  parent.basePath, subPath, parent.isFairScheduler, killEnabled, 
isFailedStage)
+  val stagesSize = stages.size
+  (Some(summary(status, stagesSize)), Some(table(status, stagesTable, 
stagesSize)))
 }
-if (shouldShowSkippedStages) {
-  content ++=
-
-  
-
-Skipped Stages ({skippedStages.size})
-  
- ++
-
-  {skippedStagesTable.toNodeSeq}
-
+  }
+
+  private def tableHeaderID(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "active"
+case StageStatus.COMPLETE => "completed"
+case StageStatus.FAILED => "failed"
+case StageStatus.PENDING => "pending"
+case StageStatus.SKIPPED => "skipped"
+  }
+
+  private def stageTag(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "activeStage"
+case StageStatus.COMPLETE => "completedStage"
+case StageStatus.FAILED => "failedStage"
+case StageStatus.PENDING => "pendingStage"
+case StageStatus.SKIPPED => "skippedStage"
+  }
+
+  private def headerDescription(status: StageStatus): String = status 
match {
+case StageStatus.ACTIVE => "Active"
+case StageStatus.COMPLETE => "Completed"
+case StageStatus.FAILED => "Failed"
+case StageStatus.PENDING => "Pending"
+case StageStatus.SKIPPED => "Skipped"
+  }
+
+  private def classSuffix(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "ActiveStages"
+case StageStatus.COMPLETE => "CompletedStages"
+case StageStatus.FAILED => "FailedStages"
+case StageStatus.PENDING => "PendingStages"
+case StageStatus.SKIPPED => "SkippedStages"
+  }
+
+  private def summaryContent(status: StageStatus, size: Int): String = {
+if (status == StageStatus.COMPLETE
+&& appSummary.numCompletedStages != size) {
+  s"${appSummary.numCompletedStages}, only showing $size"
+} else {
+  s"$size"
 }
-if (shouldShowFailedStages) {
-  content ++=
-
-  
-
-Failed Stages ({numFailedStages})
-  
- ++
-
-  {failedStagesTable.toNodeSeq}
-
+  }
+
+  private def summary(status: StageStatus, size: Int): Elem = {
+val summary =
+  
+
+  {headerDescription(status)} Stages:
+
+{summaryContent(status, size)}
+  
+
+if (status == StageStatus.COMPLETE) {
+  summary % Attribute(None, "id", Text("completed-summary"), Null)
--- End diff --

In the previous code this was also the case for `SKIPPED`, are you changing 

[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...

2018-02-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20663#discussion_r170407811
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
---
@@ -19,46 +19,22 @@ package org.apache.spark.ui.jobs
 
 import javax.servlet.http.HttpServletRequest
 
-import scala.xml.{Node, NodeSeq}
+import scala.xml.{Attribute, Elem, Node, NodeSeq, Null, Text}
 
 import org.apache.spark.scheduler.Schedulable
 import org.apache.spark.status.PoolData
-import org.apache.spark.status.api.v1._
+import org.apache.spark.status.api.v1.StageStatus
 import org.apache.spark.ui.{UIUtils, WebUIPage}
 
 /** Page showing list of all ongoing and recently finished stages and 
pools */
 private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") {
   private val sc = parent.sc
+  private lazy val allStages = parent.store.stageList(null)
--- End diff --

IIRC the class (`AllStagesPage`) is only instantiated once, and the 
`render` method is called for each request. So this won't really work.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...

2018-02-23 Thread vanzin
Github user vanzin commented on a diff in the pull request:

https://github.com/apache/spark/pull/20663#discussion_r170407883
  
--- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala 
---
@@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) 
extends WebUIPage("") {
   Seq.empty[Node]
 }
   }
-if (shouldShowActiveStages) {
-  content ++=
-
-  
-
-Active Stages ({activeStages.size})
-  
- ++
-
-  {activeStagesTable.toNodeSeq}
-
-}
-if (shouldShowPendingStages) {
-  content ++=
-
-  
-
-Pending Stages ({pendingStages.size})
-  
- ++
-
-  {pendingStagesTable.toNodeSeq}
-
+
+tables.flatten.foreach(content ++= _)
+
+UIUtils.headerSparkPage("Stages for All Jobs", content, parent)
+  }
+
+  def summaryAndTableForStatus(
+  status: StageStatus,
+  request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = {
+val stages = if (status == StageStatus.FAILED) {
+  allStages.filter(_.status == status).reverse
+} else {
+  allStages.filter(_.status == status)
 }
-if (shouldShowCompletedStages) {
-  content ++=
-
-  
-
-Completed Stages ({completedStageNumStr})
-  
- ++
-
-  {completedStagesTable.toNodeSeq}
-
+
+if (stages.isEmpty) {
+  (None, None)
+} else {
+  val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled
+  val isFailedStage = status == StageStatus.FAILED
+
+  val stagesTable =
+new StageTableBase(parent.store, request, stages, 
tableHeaderID(status), stageTag(status),
+  parent.basePath, subPath, parent.isFairScheduler, killEnabled, 
isFailedStage)
+  val stagesSize = stages.size
+  (Some(summary(status, stagesSize)), Some(table(status, stagesTable, 
stagesSize)))
 }
-if (shouldShowSkippedStages) {
-  content ++=
-
-  
-
-Skipped Stages ({skippedStages.size})
-  
- ++
-
-  {skippedStagesTable.toNodeSeq}
-
+  }
+
+  private def tableHeaderID(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "active"
+case StageStatus.COMPLETE => "completed"
+case StageStatus.FAILED => "failed"
+case StageStatus.PENDING => "pending"
+case StageStatus.SKIPPED => "skipped"
+  }
+
+  private def stageTag(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "activeStage"
+case StageStatus.COMPLETE => "completedStage"
+case StageStatus.FAILED => "failedStage"
+case StageStatus.PENDING => "pendingStage"
+case StageStatus.SKIPPED => "skippedStage"
+  }
+
+  private def headerDescription(status: StageStatus): String = status 
match {
+case StageStatus.ACTIVE => "Active"
+case StageStatus.COMPLETE => "Completed"
+case StageStatus.FAILED => "Failed"
+case StageStatus.PENDING => "Pending"
+case StageStatus.SKIPPED => "Skipped"
+  }
+
+  private def classSuffix(status: StageStatus): String = status match {
+case StageStatus.ACTIVE => "ActiveStages"
+case StageStatus.COMPLETE => "CompletedStages"
+case StageStatus.FAILED => "FailedStages"
+case StageStatus.PENDING => "PendingStages"
+case StageStatus.SKIPPED => "SkippedStages"
+  }
+
+  private def summaryContent(status: StageStatus, size: Int): String = {
+if (status == StageStatus.COMPLETE
+&& appSummary.numCompletedStages != size) {
--- End diff --

Fits in previous line.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20622
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87637/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20622
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20604
  
**[Test build #87639 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87639/testReport)**
 for PR 20604 at commit 
[`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87638/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87639/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20604
  
**[Test build #87639 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87639/testReport)**
 for PR 20604 at commit 
[`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20604
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...

2018-02-23 Thread viirya
Github user viirya commented on the issue:

https://github.com/apache/spark/pull/20666
  
cc @cloud-fan @HyukjinKwon To keep CSV reader behavior for corrupted 
records, we don't bother to refactoring. But we should update the document and 
explicitly disable partial results for corrupted records.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20666
  
**[Test build #87641 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87641/testReport)**
 for PR 20666 at commit 
[`4ad330b`](https://github.com/apache/spark/commit/4ad330b1def558e17dfb693d428e1bd69248e5a3).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20666
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20666
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1026/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...

2018-02-23 Thread viirya
GitHub user viirya opened a pull request:

https://github.com/apache/spark/pull/20666

[SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document

## What changes were proposed in this pull request?

Clarify JSON and CSV reader behavior in document.

JSON doesn't support partial results for corrupted records.
CSV only supports partial results for the records with more or less tokens.

## How was this patch tested?

Pass existing tests.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/viirya/spark-1 SPARK-23448-2

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20666.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20666


commit 4ad330b1def558e17dfb693d428e1bd69248e5a3
Author: Liang-Chi Hsieh 
Date:   2018-02-24T07:15:11Z

Clarify JSON and CSV parser behavior.




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...

2018-02-23 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20647#discussion_r170311026
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case relation: DataSourceV2Relation =>
-  DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+case r: DataSourceV2Relation =>
--- End diff --

I'm pointing it out because I think it is significant. This is also why I 
sent a note to the dev list. Changes like this make it much harder to work with 
Spark because little things change that are not necessary and cause conflicts. 
That's a problem not just when I'm maintaining a branch, but also when I'm 
trying to get a PR committed. These changes cause more work because we have to 
rebase PRs and update for variables that have changed names in the name of 
style.

In this particular case, I would probably wrap the line that is too long. 
If you want to rename to fix it, I'd consider that reasonable. But all the 
other cases that aren't necessary should be reverted.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20647
  
**[Test build #87634 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87634/testReport)**
 for PR 20647 at commit 
[`fc29f8f`](https://github.com/apache/spark/commit/fc29f8f7b5aa1bd33f5b4aa1ffd73bd9d84afc15).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20663
  
**[Test build #87631 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87631/testReport)**
 for PR 20663 at commit 
[`d246df2`](https://github.com/apache/spark/commit/d246df283e9a900cf114fcdf0eee2951b1bd3713).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-02-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r170318693
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long length) {
+super(obj, offset, length);
+this.array = obj;
+  }
+
+  @Override
+  public MemoryBlock allocate(long offset, long size) {
+return new ByteArrayMemoryBlock(array, offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+
+  public final int getInt(long offset) {
+// UTF8String.getPrefix() assumes data is 4-byte aligned
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 3) 
/ 4) * 4);
+return Platform.getInt(array, offset);
+  }
+
+  public final void putInt(long offset, int value) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putInt(array, offset, value);
+  }
+
+  public final boolean getBoolean(long offset) {
+assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getBoolean(array, offset);
+  }
+
+  public final void putBoolean(long offset, boolean value) {
+assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putBoolean(array, offset, value);
+  }
+
+  public final byte getByte(long offset) {
+return array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)];
+  }
+
+  public final void putByte(long offset, byte value) {
+array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)] = value;
+  }
+
+  public final short getShort(long offset) {
+assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getShort(array, offset);
+  }
+
+  public final void putShort(long offset, short value) {
+assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putShort(array, offset, value);
+  }
+
+  public final long getLong(long offset) {
+// UTF8String.getPrefix() assumes data is 8-byte aligned
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 7) 
/ 8) * 8);
+return Platform.getLong(array, offset);
+  }
+
+  public final void putLong(long offset, long value) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putLong(array, offset, value);
+  }
+
+  public final float getFloat(long offset) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getFloat(array, offset);
+  }
+
+  public final void putFloat(long offset, float value) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putFloat(array, offset, value);
+  }
+
+  public final double getDouble(long offset) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getDouble(array, offset);
+  }
+
+  public final void putDouble(long offset, double value) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putDouble(array, offset, value);
+  }
+
+  public final void copyFrom(byte[] src, long srcOffset, long dstOffset, 
long length) {
--- End 

[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-23 Thread liyinan926
Github user liyinan926 commented on the issue:

https://github.com/apache/spark/pull/20553
  
`spark.kubernetes.executor.cores` has nothing to do with dynamic resource 
allocation. It's just a way of letting users specify a value for the cpu 
resource request that conforms to Kubernetes convention and is only read/used 
when determining the cpu request. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...

2018-02-23 Thread rdblue
Github user rdblue commented on a diff in the pull request:

https://github.com/apache/spark/pull/20647#discussion_r170307194
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
 ---
@@ -35,15 +35,14 @@ case class DataSourceV2Relation(
 options: Map[String, String],
 projection: Seq[AttributeReference],
 filters: Option[Seq[Expression]] = None,
-userSpecifiedSchema: Option[StructType] = None) extends LeafNode with 
MultiInstanceRelation {
+userSpecifiedSchema: Option[StructType] = None)
+  extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan {
--- End diff --

Part of the utility of an immutable plan is to make equality work 
correctly. I thought that was clear, but sorry if it was not. Maybe I should 
have pointed it out explicitly.

Equality should definitely be based on all of the inputs that affect the 
output rows. That includes location or table identifier, user schema, filters, 
requested projection, implementation, and other options. I think case class 
equality is correct here.

> there is no consensus about how to explain a data source v2 relation, my 
PR tries to have people focus on this part and have a consensus.

That's a good goal for this PR, so I think you should roll back the changes 
to the relation so equality is not affected by it.

> Do you mean the path option?

Yes. The data that will be scanned is an important part. As you know, I 
think this should be part of the relation itself and not options. That would 
solve this problem.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...

2018-02-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/20647#discussion_r170307903
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
 ---
@@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan
 
 object DataSourceV2Strategy extends Strategy {
   override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
-case relation: DataSourceV2Relation =>
-  DataSourceV2ScanExec(relation.output, relation.reader) :: Nil
+case r: DataSourceV2Relation =>
--- End diff --

Strictly speaking, if I don't change this line, then I need to split the 
next line into 2 lines, as I need to pass more parameters, so it's still a 
2-line diff.

I think tiny things like this is really not worth to be pointed during code 
review, we should save our effort to focus more on the actual code logic. What 
do you think?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1021/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279950
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
--- End diff --

Do you have to 'cast' this to a Java Float object to get it to compile?
`java.lang.Float.valueOf(0.1f)` works too I guess, but equally weird. OK if 
it's required.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278078
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
--- End diff --

Import `File`, other `java.*` classes? maybe I'm missing a name conflict.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170277915
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V](
 
   override def compute(thePart: Partition, context: TaskContext): 
Iterator[ConsumerRecord[K, V]] = {
 val part = thePart.asInstanceOf[KafkaRDDPartition]
-assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
+require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
 if (part.fromOffset == part.untilOffset) {
   logInfo(s"Beginning offset ${part.fromOffset} is the same as ending 
offset " +
 s"skipping ${part.topic} ${part.partition}")
   Iterator.empty
 } else {
-  new KafkaRDDIterator(part, context)
+  logInfo(s"Computing topic ${part.topic}, partition ${part.partition} 
" +
+s"offsets ${part.fromOffset} -> ${part.untilOffset}")
+  if (compacted) {
+new CompactedKafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  } else {
+new KafkaRDDIterator[K, V](
+  part,
+  context,
+  kafkaParams,
+  useConsumerCache,
+  pollTimeout,
+  cacheInitialCapacity,
+  cacheMaxCapacity,
+  cacheLoadFactor
+)
+  }
 }
   }
+}
 
-  /**
-   * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
-   * Uses a cached consumer where possible to take advantage of prefetching
-   */
-  private class KafkaRDDIterator(
-  part: KafkaRDDPartition,
-  context: TaskContext) extends Iterator[ConsumerRecord[K, V]] {
-
-logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " 
+
-  s"offsets ${part.fromOffset} -> ${part.untilOffset}")
-
-val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+/**
+ * An iterator that fetches messages directly from Kafka for the offsets 
in partition.
+ * Uses a cached consumer where possible to take advantage of prefetching
+ */
+private class KafkaRDDIterator[K, V](
+  part: KafkaRDDPartition,
+  context: TaskContext,
+  kafkaParams: ju.Map[String, Object],
+  useConsumerCache: Boolean,
+  pollTimeout: Long,
+  cacheInitialCapacity: Int,
+  cacheMaxCapacity: Int,
+  cacheLoadFactor: Float
+) extends Iterator[ConsumerRecord[K, V]] {
+
+  val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+
+  context.addTaskCompletionListener{ context => closeIfNeeded() }
--- End diff --

This could be `...(_ => closeIfNeeded())`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279150
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala
 ---
@@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V](
 }.toArray
   }
 
-  override def count(): Long = offsetRanges.map(_.count).sum
+  override def count(): Long =
+if (compacted) {
+  super.count()
+} else {
+  offsetRanges.map(_.count).sum
+}
 
   override def countApprox(
   timeout: Long,
   confidence: Double = 0.95
-  ): PartialResult[BoundedDouble] = {
-val c = count
-new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
-  }
+  ): PartialResult[BoundedDouble] =
+if (compacted) {
+  super.countApprox(timeout, confidence)
+} else {
+  val c = count
+  new PartialResult(new BoundedDouble(c, 1.0, c, c), true)
+}
 
-  override def isEmpty(): Boolean = count == 0L
+  override def isEmpty(): Boolean =
+if (compacted) {
+  super.isEmpty()
+} else {
+  count == 0L
+}
 
-  override def take(num: Int): Array[ConsumerRecord[K, V]] = {
-val nonEmptyPartitions = this.partitions
-  .map(_.asInstanceOf[KafkaRDDPartition])
-  .filter(_.count > 0)
+  override def take(num: Int): Array[ConsumerRecord[K, V]] =
+if (compacted) {
+  super.take(num)
+} else {
+  val nonEmptyPartitions = this.partitions
+.map(_.asInstanceOf[KafkaRDDPartition])
+.filter(_.count > 0)
 
-if (num < 1 || nonEmptyPartitions.isEmpty) {
-  return new Array[ConsumerRecord[K, V]](0)
-}
+  if (num < 1 || nonEmptyPartitions.isEmpty) {
--- End diff --

I guess you could check `num < 1` before the map/filter, but it's trivial.
You could write `return Array.empty[ConsumerRecord[K,V]]` too; again 
trivial.
Since this is existing code I could see not touching it as well.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-02-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r170296358
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java
 ---
@@ -58,7 +58,8 @@ public MemoryBlock allocate(long size) throws 
OutOfMemoryError {
 final long[] array = arrayReference.get();
 if (array != null) {
   assert (array.length * 8L >= size);
-  MemoryBlock memory = new MemoryBlock(array, 
Platform.LONG_ARRAY_OFFSET, size);
+  MemoryBlock memory =
+new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, 
size);
--- End diff --

sure, done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20663
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87631/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20663
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...

2018-02-23 Thread vanzin
Github user vanzin commented on the issue:

https://github.com/apache/spark/pull/20663
  
Could you file a separate bug for this cleanup? Thx


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...

2018-02-23 Thread marmbrus
Github user marmbrus commented on a diff in the pull request:

https://github.com/apache/spark/pull/20647#discussion_r170185948
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 ---
@@ -77,31 +79,32 @@ class MicroBatchExecution(
   
sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",")
 
 val _logicalPlan = analyzedPlan.transform {
-  case streamingRelation@StreamingRelation(dataSourceV1, sourceName, 
output) =>
-toExecutionRelationMap.getOrElseUpdate(streamingRelation, {
+  case s @ StreamingRelation(dsV1, sourceName, output) =>
--- End diff --

If you are touching that specific code then its fine to fix the style, but 
in general I tend to agree that it makes the diff harder to read and commit 
harder to back port if you include spurious changes.

I've even seen guidelines that specifically prohibit fixing style just to 
fix style since it obfuscates the history.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20658
  
**[Test build #87627 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87627/testReport)**
 for PR 20658 at commit 
[`d7e03cd`](https://github.com/apache/spark/commit/d7e03cd507b58fda8830b580390f5224ee7c8d65).
 * This patch **fails Python style tests**.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20658
  
Test FAILed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87627/
Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20658
  
**[Test build #87627 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87627/testReport)**
 for PR 20658 at commit 
[`d7e03cd`](https://github.com/apache/spark/commit/d7e03cd507b58fda8830b580390f5224ee7c8d65).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20647
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 
https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87624/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/20658
  
Merged build finished. Test FAILed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...

2018-02-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20658
  
ok to test


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/20647
  
**[Test build #87624 has 
finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87624/testReport)**
 for PR 20647 at commit 
[`dbee281`](https://github.com/apache/spark/commit/dbee2813accd4c8f5937b28eb9142cc6a50f8c6a).
 * This patch passes all tests.
 * This patch merges cleanly.
 * This patch adds no public classes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...

2018-02-23 Thread HyukjinKwon
Github user HyukjinKwon commented on the issue:

https://github.com/apache/spark/pull/20648
  
+1 for disallowing it anyway if it was Wenchen's opinion too. Please go 
ahead. Will help double check anyway.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Test PASSed.
Refer to this link for build results (access rights to CI server needed): 

https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1020/
Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-02-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r170296478
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -45,38 +44,149 @@
*/
   public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3;
 
-  private final long length;
+  @Nullable
+  protected Object obj;
+
+  protected long offset;
+
+  protected long length;
 
   /**
* Optional page number; used when this MemoryBlock represents a page 
allocated by a
-   * TaskMemoryManager. This field is public so that it can be modified by 
the TaskMemoryManager,
-   * which lives in a different package.
+   * TaskMemoryManager. This field can be updated using setPageNumber 
method so that
+   * this can be modified by the TaskMemoryManager, which lives in a 
different package.
*/
-  public int pageNumber = NO_PAGE_NUMBER;
+  private int pageNumber = NO_PAGE_NUMBER;
 
   public MemoryBlock(@Nullable Object obj, long offset, long length) {
-super(obj, offset);
+this.obj = obj;
+this.offset = offset;
 this.length = length;
   }
 
+  public MemoryBlock() {
+this(null, 0, 0);
+  }
+
+  public final Object getBaseObject() {
+return obj;
+  }
+
+  public final long getBaseOffset() {
+return offset;
+  }
+
+  public void resetObjAndOffset() {
+this.obj = null;
+this.offset = 0;
+  }
+
   /**
* Returns the size of the memory block.
*/
-  public long size() {
+  public final long size() {
 return length;
   }
 
-  /**
-   * Creates a memory block pointing to the memory used by the long array.
-   */
-  public static MemoryBlock fromLongArray(final long[] array) {
-return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length 
* 8L);
+  public final void setPageNumber(int pageNum) {
+pageNumber = pageNum;
+  }
+
+  public final int getPageNumber() {
+return pageNumber;
   }
 
   /**
* Fills the memory block with the specified byte value.
*/
-  public void fill(byte value) {
+  public final void fill(byte value) {
 Platform.setMemory(obj, offset, length, value);
   }
+
+  /**
+   * Instantiate MemoryBlock for given object type with new offset
+   */
+  public final static MemoryBlock allocateFromObject(Object obj, long 
offset, long length) {
+MemoryBlock mb = null;
+if (obj instanceof byte[]) {
+  byte[] array = (byte[])obj;
+  mb = new ByteArrayMemoryBlock(array, offset, length);
+} else if (obj instanceof long[]) {
+  long[] array = (long[])obj;
+  mb = new OnHeapMemoryBlock(array, offset, length);
+} else if (obj == null) {
+  // we assume that to pass null pointer means off-heap
+  mb = new OffHeapMemoryBlock(offset, length);
+} else {
+  throw new UnsupportedOperationException(obj.getClass() + " is not 
supported now");
+}
+return mb;
+  }
+
+  /**
+   * Instantiate the same type of MemoryBlock with new offset and size
+   */
+  public abstract MemoryBlock allocate(long offset, long size);
+
+
+  public abstract int getInt(long offset);
+
+  public abstract void putInt(long offset, int value);
+
+  public abstract boolean getBoolean(long offset);
+
+  public abstract void putBoolean(long offset, boolean value);
+
+  public abstract byte getByte(long offset);
+
+  public abstract void putByte(long offset, byte value);
+
+  public abstract short getShort(long offset);
+
+  public abstract void putShort(long offset, short value);
+
+  public abstract long getLong(long offset);
+
+  public abstract void putLong(long offset, long value);
+
+  public abstract float getFloat(long offset);
+
+  public abstract void putFloat(long offset, float value);
+
+  public abstract double getDouble(long offset);
+
+  public abstract void putDouble(long offset, double value);
+
+  public abstract Object getObjectVolatile(long offset);
--- End diff --

Yeah, dropped


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-02-23 Thread kiszk
Github user kiszk commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r170296403
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java ---
@@ -22,10 +22,9 @@
 import org.apache.spark.unsafe.Platform;
 
 /**
- * A consecutive block of memory, starting at a {@link MemoryLocation} 
with a fixed size.
+ * A declaration of interfaces of MemoryBlock classes .
--- End diff --

Thanks, done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread SparkQA
Github user SparkQA commented on the issue:

https://github.com/apache/spark/pull/19222
  
**[Test build #87633 has 
started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87633/testReport)**
 for PR 19222 at commit 
[`5e3afd1`](https://github.com/apache/spark/commit/5e3afd11a2dc76d2cd23264b4052abbf3f5d7e9d).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...

2018-02-23 Thread AmplabJenkins
Github user AmplabJenkins commented on the issue:

https://github.com/apache/spark/pull/19222
  
Merged build finished. Test PASSed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-23 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20553
  
also cc @cloud-fan @jerryshao 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20664: [SPARK-23496][CORE] Locality of coalesced partiti...

2018-02-23 Thread mgaido91
Github user mgaido91 commented on a diff in the pull request:

https://github.com/apache/spark/pull/20664#discussion_r170279656
  
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -1129,6 +1129,36 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
 }.collect()
   }
 
+  test("SPARK-23496: order of input partitions can result in severe skew 
in coalesce") {
--- End diff --

I see, thanks, sorry, I missed it


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...

2018-02-23 Thread cloud-fan
Github user cloud-fan commented on a diff in the pull request:

https://github.com/apache/spark/pull/19222#discussion_r170305944
  
--- Diff: 
common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java
 ---
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.unsafe.memory;
+
+import org.apache.spark.unsafe.Platform;
+
+/**
+ * A consecutive block of memory with a byte array on Java heap.
+ */
+public final class ByteArrayMemoryBlock extends MemoryBlock {
+
+  private final byte[] array;
+
+  public ByteArrayMemoryBlock(byte[] obj, long offset, long length) {
+super(obj, offset, length);
+this.array = obj;
+  }
+
+  @Override
+  public MemoryBlock allocate(long offset, long size) {
+return new ByteArrayMemoryBlock(array, offset, size);
+  }
+
+  public byte[] getByteArray() { return array; }
+
+  /**
+   * Creates a memory block pointing to the memory used by the byte array.
+   */
+  public static ByteArrayMemoryBlock fromArray(final byte[] array) {
+return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, 
array.length);
+  }
+
+
+  public final int getInt(long offset) {
+// UTF8String.getPrefix() assumes data is 4-byte aligned
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 3) 
/ 4) * 4);
+return Platform.getInt(array, offset);
+  }
+
+  public final void putInt(long offset, int value) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putInt(array, offset, value);
+  }
+
+  public final boolean getBoolean(long offset) {
+assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getBoolean(array, offset);
+  }
+
+  public final void putBoolean(long offset, boolean value) {
+assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putBoolean(array, offset, value);
+  }
+
+  public final byte getByte(long offset) {
+return array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)];
+  }
+
+  public final void putByte(long offset, byte value) {
+array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)] = value;
+  }
+
+  public final short getShort(long offset) {
+assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getShort(array, offset);
+  }
+
+  public final void putShort(long offset, short value) {
+assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putShort(array, offset, value);
+  }
+
+  public final long getLong(long offset) {
+// UTF8String.getPrefix() assumes data is 8-byte aligned
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 7) 
/ 8) * 8);
+return Platform.getLong(array, offset);
+  }
+
+  public final void putLong(long offset, long value) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putLong(array, offset, value);
+  }
+
+  public final float getFloat(long offset) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getFloat(array, offset);
+  }
+
+  public final void putFloat(long offset, float value) {
+assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putFloat(array, offset, value);
+  }
+
+  public final double getDouble(long offset) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+return Platform.getDouble(array, offset);
+  }
+
+  public final void putDouble(long offset, double value) {
+assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length);
+Platform.putDouble(array, offset, value);
+  }
+
+  public final void copyFrom(byte[] src, long srcOffset, long dstOffset, 
long length) {
--- End 

[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...

2018-02-23 Thread jiangxb1987
Github user jiangxb1987 commented on the issue:

https://github.com/apache/spark/pull/20553
  
IIUC the `spark.kubernetes.executor.cores` here is just a special case for 
`spark.executor.cores`, for k8s backend, you shall still have to handle float 
values if you're to read the value of `spark.kubernetes.executor.cores`, for 
instance, in dynamic allocation. If that is the case here, I don't see much 
benefit of bringing in a new conf here.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278317
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 ---
@@ -162,17 +162,22 @@ private[kafka010] class KafkaTestUtils extends 
Logging {
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
-  def createTopic(topic: String, partitions: Int): Unit = {
-AdminUtils.createTopic(zkUtils, topic, partitions, 1)
+  def createTopic(topic: String, partitions: Int, config: Properties): 
Unit = {
+AdminUtils.createTopic(zkUtils, topic, partitions, 1, config)
 // wait until metadata is propagated
 (0 until partitions).foreach { p =>
   waitUntilMetadataIsPropagated(topic, p)
 }
   }
 
+  /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
+  def createTopic(topic: String, partitions: Int): Unit = {
+createTopic(topic, partitions, new Properties)
+  }
+
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
   def createTopic(topic: String): Unit = {
-createTopic(topic, 1)
+createTopic(topic, 1, new Properties)
--- End diff --

Nit: `new Properties()`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170279504
  
--- Diff: 
external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala
 ---
@@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with 
BeforeAndAfterAll {
 
   private val preferredHosts = LocationStrategies.PreferConsistent
 
+  private def compactLogs(topic: String, partition: Int, messages: 
Array[(String, String)]) {
+val mockTime = new MockTime()
+// LogCleaner in 0.10 version of Kafka is still expecting the old 
TopicAndPartition api
+val logs = new Pool[TopicAndPartition, Log]()
+val logDir = kafkaTestUtils.brokerLogDir
+val dir = new java.io.File(logDir, topic + "-" + partition)
+dir.mkdirs()
+val logProps = new ju.Properties()
+logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: 
java.lang.Float)
+val log = new Log(
+  dir,
+  LogConfig(logProps),
+  0L,
+  mockTime.scheduler,
+  mockTime
+)
+messages.foreach { case (k, v) =>
+val msg = new ByteBufferMessageSet(
--- End diff --

Unindent one level?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...

2018-02-23 Thread srowen
Github user srowen commented on a diff in the pull request:

https://github.com/apache/spark/pull/20572#discussion_r170278931
  
--- Diff: 
external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala
 ---
@@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private(
 }
 
 if (!buffer.hasNext()) { poll(timeout) }
-assert(buffer.hasNext(),
+require(buffer.hasNext(),
   s"Failed to get records for $groupId $topic $partition $offset after 
polling for $timeout")
 var record = buffer.next()
 
 if (record.offset != offset) {
   logInfo(s"Buffer miss for $groupId $topic $partition $offset")
   seek(offset)
   poll(timeout)
-  assert(buffer.hasNext(),
+  require(buffer.hasNext(),
 s"Failed to get records for $groupId $topic $partition $offset 
after polling for $timeout")
   record = buffer.next()
-  assert(record.offset == offset,
-s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset")
+  require(record.offset == offset,
+s"Got wrong record for $groupId $topic $partition even after 
seeking to offset $offset " +
+  s"got offset ${record.offset} instead. If this is a compacted 
topic, consider enabling " +
+  "spark.streaming.kafka.allowNonConsecutiveOffsets"
+  )
 }
 
 nextOffset = offset + 1
 record
   }
 
+  /**
+   * Start a batch on a compacted topic
+   */
+  def compactedStart(offset: Long, timeout: Long): Unit = {
+logDebug(s"compacted start $groupId $topic $partition starting 
$offset")
+// This seek may not be necessary, but it's hard to tell due to gaps 
in compacted topics
+if (offset != nextOffset) {
+  logInfo(s"Initial fetch for compacted $groupId $topic $partition 
$offset")
+  seek(offset)
+  poll(timeout)
+}
+  }
+
+  /**
+   * Get the next record in the batch from a compacted topic.
+   * Assumes compactedStart has been called first, and ignores gaps.
+   */
+  def compactedNext(timeout: Long): ConsumerRecord[K, V] = {
+if (!buffer.hasNext()) { poll(timeout) }
--- End diff --

Nit: I'd expand this onto two lines


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   >