[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20648 @HyukjinKwon @cloud-fan Thanks for the comment! Yes, I agreed we need to keep the CSV's behavior. I will check how much we can clean up with it. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20622 **[Test build #87636 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87636/testReport)** for PR 20622 at commit [`d404baf`](https://github.com/apache/spark/commit/d404bafa869a4950d607200586c88ea17d99f9f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20622: [SPARK-23491][SS] Remove explicit job cancellatio...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20622#discussion_r170389827 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -266,6 +264,12 @@ class ContinuousExecution( SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } +} catch { + case t: Throwable + if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => +stopSources() +sparkSession.sparkContext.cancelJobGroup(runId.toString) --- End diff -- nit: I think its cleaner to put this in the `finally` since this is the invariant we want when this entire method terminates. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20622 **[Test build #87637 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87637/testReport)** for PR 20622 at commit [`d3b16c1`](https://github.com/apache/spark/commit/d3b16c11671ba6514360121556ed5554f8bcf890). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20653: [SPARK-23459][SQL] Improve the error message when unknow...
Github user gatorsmile commented on the issue: https://github.com/apache/spark/pull/20653 LGTM Thanks! Merged to master --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20604 **[Test build #87635 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport)** for PR 20604 at commit [`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1022/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20604: [SPARK-23365][CORE] Do not adjust num executors w...
Github user squito commented on a diff in the pull request: https://github.com/apache/spark/pull/20604#discussion_r170383918 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -55,18 +55,18 @@ private[spark] trait ExecutorAllocationClient { /** * Request that the cluster manager kill the specified executors. * - * When asking the executor to be replaced, the executor loss is considered a failure, and - * killed tasks that are running on the executor will count towards the failure limits. If no - * replacement is being requested, then the tasks will not count towards the limit. - * * @param executorIds identifiers of executors to kill - * @param replace whether to replace the killed executors with new ones, default false + * @param adjustTargetNumExecutors whether the target number of executors will be adjusted down + * after these executors have been killed + * @param countFailures if there are tasks running on the executors when they are killed, whether --- End diff -- whoops, I was supposed to set `countFailures = true` in `sc.killAndReplaceExecutors`, thanks for catching that. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user tdas commented on the issue: https://github.com/apache/spark/pull/20622 LGTM, assuming tests pass. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1023/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #87638 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87638/testReport)** for PR 19222 at commit [`95fbdee`](https://github.com/apache/spark/commit/95fbdee04e9137938cdc76f7f4573116720357f5). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20622: [SPARK-23491][SS] Remove explicit job cancellatio...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20622#discussion_r170392408 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala --- @@ -266,6 +264,12 @@ class ContinuousExecution( SQLExecution.withNewExecutionId( sparkSessionForQuery, lastExecution)(lastExecution.toRdd) } +} catch { + case t: Throwable + if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => +stopSources() +sparkSession.sparkContext.cancelJobGroup(runId.toString) --- End diff -- So we only swallow the exception when we are reconfiguration (btw always add logging when swallowing exceptions to leave a trail for debugging), and `stopSources()` and `cancelJobGroups()` can be finally as we want that as invariant no matter what happens in this `runContinuous` method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87635/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1024/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user rdblue commented on the issue: https://github.com/apache/spark/pull/20647 Thanks for removing the equality methods. This changes equality for the scan and streaming relation, though. Are those significant changes? I still think this should not be committed until the style-only changes are rolled back. This is a significant source of headache for branch maintainers and contributors. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20648 > Yup, +1 for starting this by disallowing but up to my knowledge R's read.csv allows then the legnth of tokens are shorter then its schema, putting nulls (or NA) into missing fields, as a valid case. @HyukjinKwon If the length of tokens are longer than its schema, R's read.csv seems not to have error. Is this behavior also we want? Spark's CSV reader just drops extra tokens when under permissive mode. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20611: [SPARK-23425][SQL]When wild card is been used in load co...
Github user sujith71955 commented on the issue: https://github.com/apache/spark/pull/20611 @gatorsmile Seems to be a random failures, each time random set of test cases are failing. Please let me know for any suggestions --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170412046 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala --- @@ -631,6 +651,160 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val expected = Map(0 -> 1.0 / 3.0, 2 -> 2.0 / 3.0) assert(mapToVec(map.toMap) ~== mapToVec(expected) relTol 0.01) } + + test("[SPARK-3159] tree model redundancy - binary classification") { +val numClasses = 2 + +val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 4, + numClasses = numClasses, maxBins = 32) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: + root + left1 right1 +left2 right2 + + pred(left1) = 0 + pred(left2) = 1 + pred(right2) = 0 + */ +assert(dt.rootNode.numDescendants === 4) +assert(dt.rootNode.subtreeDepth === 2) + +assert(dt.rootNode.isInstanceOf[InternalNode]) + +// left 1 prediction test +assert(dt.rootNode.asInstanceOf[InternalNode].leftChild.prediction === 0) + +val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild +assert(right1.isInstanceOf[InternalNode]) + +// left 2 prediction test +assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 1) +// right 2 prediction test +assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 0) + } + + test("[SPARK-3159] tree model redundancy - multiclass classification") { +val numClasses = 4 + +val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 4, + numClasses = numClasses, maxBins = 32) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: +root +left1 right1 +left2 right2 left3 right3 + + pred(left2) = 0 + pred(right2) = 1 + pred(left3) = 2 + pred(right3) = 1 + */ +assert(dt.rootNode.numDescendants === 6) +assert(dt.rootNode.subtreeDepth === 2) + +assert(dt.rootNode.isInstanceOf[InternalNode]) + +val left1 = dt.rootNode.asInstanceOf[InternalNode].leftChild +val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild + +assert(left1.isInstanceOf[InternalNode]) + +// left 2 prediction test +assert(left1.asInstanceOf[InternalNode].leftChild.prediction === 0) +// right 2 prediction test +assert(left1.asInstanceOf[InternalNode].rightChild.prediction === 1) + +assert(right1.isInstanceOf[InternalNode]) + +// left 3 prediction test +assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 2) +// right 3 prediction test +assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 1) + } + + test("[SPARK-3159] tree model redundancy - regression") { +val numClasses = 2 + +val strategy = new OldStrategy(algo = OldAlgo.Regression, impurity = Variance, + maxDepth = 3, maxBins = 10, numClasses = numClasses) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: +root +1 2 + 1_1 1_2 2_1 2_2 + 1_1_1 1_1_2 1_2_1 1_2_2 2_1_1 2_1_2 + + pred(1_1_1) = 0.5 + pred(1_1_2) = 0.0 + pred(1_2_1) = 0.0 + pred(1_2_2) = 0.25 + pred(2_1_1) = 1.0 + pred(2_1_2) = 0. + pred(2_2)= 0.5 + */ + +assert(dt.rootNode.numDescendants === 12) --- End diff -- Ok, trying to understand these tests. From what I can tell, you've written a data generator that generates random points, and, somewhat by chance, generates redundant tree nodes if the tree is not pruned. Your relying on the random seed to give you a tree which should have exactly 12 descendants after pruning. I think these may be overly complicated. IMO, we just need to test the situation that causes this by creating simple dataset that can improve the impurity by splitting, but which does not change the prediction. For example, for the Gini impurity you might have the following: ```scala val data = Array( LabeledPoint(0.0, Vectors.dense(1.0)), LabeledPoint(0.0, Vectors.dense(1.0)),
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410747 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala --- @@ -402,20 +405,40 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { LabeledPoint(1.0, Vectors.dense(2.0))) val input = sc.parallelize(arr) +val seed = 42 +val numTrees = 1 + // Must set maxBins s.t. the feature will be treated as an ordered categorical feature. val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 1, numClasses = 2, categoricalFeaturesInfo = Map(0 -> 3), maxBins = 3) -val model = RandomForest.run(input, strategy, numTrees = 1, featureSubsetStrategy = "all", - seed = 42, instr = None).head -model.rootNode match { - case n: InternalNode => n.split match { -case s: CategoricalSplit => - assert(s.leftCategories === Array(1.0)) -case _ => throw new AssertionError("model.rootNode.split was not a CategoricalSplit") - } - case _ => throw new AssertionError("model.rootNode was not an InternalNode") -} +val metadata = DecisionTreeMetadata.buildMetadata(input, strategy, numTrees = numTrees, + featureSubsetStrategy = "all") +val splits = RandomForest.findSplits(input, metadata, seed = seed) + +val treeInput = TreePoint.convertToTreeRDD(input, splits, metadata) +val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, + strategy.subsamplingRate, numTrees, false, seed = seed) + +val topNode = LearningNode.emptyNode(nodeIndex = 1) +assert(topNode.isLeaf === false) +assert(topNode.stats === null) + +val nodesForGroup = Map(0 -> Array(topNode)) +val treeToNodeToIndexInfo = Map(0 -> Map( + topNode.id -> new RandomForest.NodeIndexInfo(0, None) +)) +val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] +val bestSplit = RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), + nodesForGroup, treeToNodeToIndexInfo, splits, nodeStack) + +assert(topNode.split.isDefined, "rootNode does not have a split") --- End diff -- I'm a fan of just calling `foreach` like: ```scala topNode.split.foreach { split => assert(split.isInstanceOf[CategoricalSplit]) assert(split.toOld.categories === Array(1.0)) } ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410687 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala --- @@ -402,20 +407,35 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { LabeledPoint(1.0, Vectors.dense(2.0))) val input = sc.parallelize(arr) +val numTrees = 1 + // Must set maxBins s.t. the feature will be treated as an ordered categorical feature. val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 1, numClasses = 2, categoricalFeaturesInfo = Map(0 -> 3), maxBins = 3) -val model = RandomForest.run(input, strategy, numTrees = 1, featureSubsetStrategy = "all", - seed = 42, instr = None).head -model.rootNode match { - case n: InternalNode => n.split match { -case s: CategoricalSplit => - assert(s.leftCategories === Array(1.0)) -case _ => throw new AssertionError("model.rootNode.split was not a CategoricalSplit") - } - case _ => throw new AssertionError("model.rootNode was not an InternalNode") -} +val metadata = DecisionTreeMetadata.buildMetadata(input, strategy, numTrees = numTrees, + featureSubsetStrategy = "all") +val splits = RandomForest.findSplits(input, metadata, seed = seed) + +val treeInput = TreePoint.convertToTreeRDD(input, splits, metadata) +val baggedInput = BaggedPoint.convertToBaggedRDD(treeInput, + strategy.subsamplingRate, numTrees, false, seed = seed) + +val topNode = LearningNode.emptyNode(nodeIndex = 1) +val nodesForGroup = Map(0 -> Array(topNode)) +val treeToNodeToIndexInfo = Map(0 -> Map( + topNode.id -> new RandomForest.NodeIndexInfo(0, None) +)) +val nodeStack = new mutable.ArrayStack[(Int, LearningNode)] +val bestSplit = RandomForest.findBestSplits(baggedInput, metadata, Map(0 -> topNode), --- End diff -- This method returns unit. Doesn't make sense to assign its output. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410905 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/tree/DecisionTreeSuite.scala --- @@ -541,7 +541,7 @@ object DecisionTreeSuite extends SparkFunSuite { Array[LabeledPoint] = { val arr = new Array[LabeledPoint](3000) for (i <- 0 until 3000) { - if (i < 1000) { + if (i < 1001) { --- End diff -- this is the type of thing that will puzzle someone down the line. I'm ok with it, though. :stuck_out_tongue_closed_eyes: --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20604 **[Test build #87635 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87635/testReport)** for PR 20604 at commit [`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20604 retest this please --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20662: [SPARK-23475][UI][BACKPORT-2.3] Show also skipped stages
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20662 Merging to 2.3. Please close the PR manually. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20622 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87636/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20622 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20622 **[Test build #87636 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87636/testReport)** for PR 20622 at commit [`d404baf`](https://github.com/apache/spark/commit/d404bafa869a4950d607200586c88ea17d99f9f5). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20622 **[Test build #87637 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87637/testReport)** for PR 20622 at commit [`d3b16c1`](https://github.com/apache/spark/commit/d3b16c11671ba6514360121556ed5554f8bcf890). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #87638 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87638/testReport)** for PR 19222 at commit [`95fbdee`](https://github.com/apache/spark/commit/95fbdee04e9137938cdc76f7f4573116720357f5). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20648 _To me_ I have been roughly thinking that we should better match it to R's read.csv and explicitly document this. I believe this is a good reference our CSV has resembled so far. BTW, I don't mind doing this separately as whatever you think is right. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20648 I think at least we should update the document for this behavior of csv reader. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410775 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -283,10 +292,12 @@ private[tree] class LearningNode( // Here we want to keep same behavior with the old mllib.DecisionTreeModel new LeafNode(stats.impurityCalculator.predict, -1.0, stats.impurityCalculator) } - } } + /** @return true iff a node is a leaf. */ + private def isLeafNode(): Boolean = leftChild.isEmpty && rightChild.isEmpty --- End diff -- I wouldn't mind just removing this change. What constitutes a leaf node is now fuzzy, and if you just inline it the one place it's used there is no confusion. At any rate, you don't need the parentheses after method name. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170412098 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala --- @@ -631,6 +651,160 @@ class RandomForestSuite extends SparkFunSuite with MLlibTestSparkContext { val expected = Map(0 -> 1.0 / 3.0, 2 -> 2.0 / 3.0) assert(mapToVec(map.toMap) ~== mapToVec(expected) relTol 0.01) } + + test("[SPARK-3159] tree model redundancy - binary classification") { +val numClasses = 2 + +val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 4, + numClasses = numClasses, maxBins = 32) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: + root + left1 right1 +left2 right2 + + pred(left1) = 0 + pred(left2) = 1 + pred(right2) = 0 + */ +assert(dt.rootNode.numDescendants === 4) +assert(dt.rootNode.subtreeDepth === 2) + +assert(dt.rootNode.isInstanceOf[InternalNode]) + +// left 1 prediction test +assert(dt.rootNode.asInstanceOf[InternalNode].leftChild.prediction === 0) + +val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild +assert(right1.isInstanceOf[InternalNode]) + +// left 2 prediction test +assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 1) +// right 2 prediction test +assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 0) + } + + test("[SPARK-3159] tree model redundancy - multiclass classification") { +val numClasses = 4 + +val strategy = new OldStrategy(algo = OldAlgo.Classification, impurity = Gini, maxDepth = 4, + numClasses = numClasses, maxBins = 32) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: +root +left1 right1 +left2 right2 left3 right3 + + pred(left2) = 0 + pred(right2) = 1 + pred(left3) = 2 + pred(right3) = 1 + */ +assert(dt.rootNode.numDescendants === 6) +assert(dt.rootNode.subtreeDepth === 2) + +assert(dt.rootNode.isInstanceOf[InternalNode]) + +val left1 = dt.rootNode.asInstanceOf[InternalNode].leftChild +val right1 = dt.rootNode.asInstanceOf[InternalNode].rightChild + +assert(left1.isInstanceOf[InternalNode]) + +// left 2 prediction test +assert(left1.asInstanceOf[InternalNode].leftChild.prediction === 0) +// right 2 prediction test +assert(left1.asInstanceOf[InternalNode].rightChild.prediction === 1) + +assert(right1.isInstanceOf[InternalNode]) + +// left 3 prediction test +assert(right1.asInstanceOf[InternalNode].leftChild.prediction === 2) +// right 3 prediction test +assert(right1.asInstanceOf[InternalNode].rightChild.prediction === 1) + } + + test("[SPARK-3159] tree model redundancy - regression") { +val numClasses = 2 + +val strategy = new OldStrategy(algo = OldAlgo.Regression, impurity = Variance, + maxDepth = 3, maxBins = 10, numClasses = numClasses) + +val dt = buildRedundantDecisionTree(numClasses, 20, strategy = strategy) + +/* Expected tree structure tested below: +root +1 2 + 1_1 1_2 2_1 2_2 + 1_1_1 1_1_2 1_2_1 1_2_2 2_1_1 2_1_2 + + pred(1_1_1) = 0.5 + pred(1_1_2) = 0.0 + pred(1_2_1) = 0.0 + pred(1_2_2) = 0.25 + pred(2_1_1) = 1.0 + pred(2_1_2) = 0. + pred(2_2)= 0.5 + */ + +assert(dt.rootNode.numDescendants === 12) --- End diff -- The tree tests are already so long and complicated that I think it's important to simplify where possible. These tests are useful as they are, but it probably won't be obvious why/how they work to future devs. Also, if we can avoid adding data generation code, that would be nice (there's already tons of code like that laying around the test suites). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410834 --- Diff: mllib/src/main/scala/org/apache/spark/ml/tree/Node.scala --- @@ -270,11 +269,21 @@ private[tree] class LearningNode( * Convert this [[LearningNode]] to a regular [[Node]], and recurse on any children. */ def toNode: Node = { -if (leftChild.nonEmpty) { - assert(rightChild.nonEmpty && split.nonEmpty && stats != null, + +// convert to an inner node only when: +// -) the node is not a leaf, and +// -) the subtree rooted at this node cannot be replaced by a single leaf +// (i.e., there at least two different leaf predictions appear in the subtree) --- End diff -- This comment seems out of place now. You might just say `// when both children make the same prediction, collapse into single leaf` or something similar below the first case statement. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20632: [SPARK-3159] added subtree pruning in the transla...
Github user sethah commented on a diff in the pull request: https://github.com/apache/spark/pull/20632#discussion_r170410851 --- Diff: mllib/src/test/scala/org/apache/spark/ml/tree/impl/RandomForestSuite.scala --- @@ -18,17 +18,20 @@ package org.apache.spark.ml.tree.impl import scala.collection.mutable +import scala.util.Random -import org.apache.spark.SparkFunSuite +import org.apache.spark.{SparkContext, SparkFunSuite} import org.apache.spark.ml.classification.DecisionTreeClassificationModel import org.apache.spark.ml.feature.LabeledPoint import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.tree._ import org.apache.spark.ml.util.TestingUtils._ import org.apache.spark.mllib.tree.{DecisionTreeSuite => OldDTSuite, EnsembleTestHelper} import org.apache.spark.mllib.tree.configuration.{Algo => OldAlgo, QuantileStrategy, Strategy => OldStrategy} +import org.apache.spark.mllib.tree.configuration.FeatureType._ --- End diff -- unused --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1025/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user cloud-fan commented on the issue: https://github.com/apache/spark/pull/20647 > This changes equality for the scan and streaming relation, though. If we think this is the right equality for `DataSourceV2Relation`, it should also be the right equality for scan and streaming relation. They should be consistent. I've rolled back unnecessary style-only changes, but leave the one that clean up unused imports. I think this should be encouraged, it's very hard to find out unused imports during code review, people should find and fix them while touching files in the PR, some IDE can even do it automatically when you save the file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20647 **[Test build #87640 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87640/testReport)** for PR 20647 at commit [`a73370a`](https://github.com/apache/spark/commit/a73370a5bf56f45ce67cd6cdaf86b53a14a67b5b). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20663#discussion_r170408040 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala --- @@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Seq.empty[Node] } } -if (shouldShowActiveStages) { - content ++= - - - -Active Stages ({activeStages.size}) - - ++ - - {activeStagesTable.toNodeSeq} - -} -if (shouldShowPendingStages) { - content ++= - - - -Pending Stages ({pendingStages.size}) - - ++ - - {pendingStagesTable.toNodeSeq} - + +tables.flatten.foreach(content ++= _) --- End diff -- `content ++= tables.flatten`? But I think this would be better as: ``` val summary = blah val pools = if (sc.isDefined && isFairScheduler) ... else ... val stages = tables.flatten val content = summary ++ pools ++ stages ``` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20663#discussion_r170407867 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala --- @@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Seq.empty[Node] } } -if (shouldShowActiveStages) { - content ++= - - - -Active Stages ({activeStages.size}) - - ++ - - {activeStagesTable.toNodeSeq} - -} -if (shouldShowPendingStages) { - content ++= - - - -Pending Stages ({pendingStages.size}) - - ++ - - {pendingStagesTable.toNodeSeq} - + +tables.flatten.foreach(content ++= _) + +UIUtils.headerSparkPage("Stages for All Jobs", content, parent) + } + + def summaryAndTableForStatus( + status: StageStatus, + request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = { +val stages = if (status == StageStatus.FAILED) { + allStages.filter(_.status == status).reverse +} else { + allStages.filter(_.status == status) } -if (shouldShowCompletedStages) { - content ++= - - - -Completed Stages ({completedStageNumStr}) - - ++ - - {completedStagesTable.toNodeSeq} - + +if (stages.isEmpty) { + (None, None) +} else { + val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled + val isFailedStage = status == StageStatus.FAILED + + val stagesTable = +new StageTableBase(parent.store, request, stages, tableHeaderID(status), stageTag(status), + parent.basePath, subPath, parent.isFairScheduler, killEnabled, isFailedStage) + val stagesSize = stages.size + (Some(summary(status, stagesSize)), Some(table(status, stagesTable, stagesSize))) } -if (shouldShowSkippedStages) { - content ++= - - - -Skipped Stages ({skippedStages.size}) - - ++ - - {skippedStagesTable.toNodeSeq} - + } + + private def tableHeaderID(status: StageStatus): String = status match { --- End diff -- All these look very similar. Having a single one that does the mapping and have the others call that method would be nice. e.g. ``` def stageTag(status: StageStatus) = s"${statusName(status)}Stage" ``` Then you could also get rid of `classSuffix`, for example, since it's only really called in one place, and the new implementation would be much simpler. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20663#discussion_r170407936 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala --- @@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Seq.empty[Node] } } -if (shouldShowActiveStages) { - content ++= - - - -Active Stages ({activeStages.size}) - - ++ - - {activeStagesTable.toNodeSeq} - -} -if (shouldShowPendingStages) { - content ++= - - - -Pending Stages ({pendingStages.size}) - - ++ - - {pendingStagesTable.toNodeSeq} - + +tables.flatten.foreach(content ++= _) + +UIUtils.headerSparkPage("Stages for All Jobs", content, parent) + } + + def summaryAndTableForStatus( + status: StageStatus, + request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = { +val stages = if (status == StageStatus.FAILED) { + allStages.filter(_.status == status).reverse +} else { + allStages.filter(_.status == status) } -if (shouldShowCompletedStages) { - content ++= - - - -Completed Stages ({completedStageNumStr}) - - ++ - - {completedStagesTable.toNodeSeq} - + +if (stages.isEmpty) { + (None, None) +} else { + val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled + val isFailedStage = status == StageStatus.FAILED + + val stagesTable = +new StageTableBase(parent.store, request, stages, tableHeaderID(status), stageTag(status), + parent.basePath, subPath, parent.isFairScheduler, killEnabled, isFailedStage) + val stagesSize = stages.size + (Some(summary(status, stagesSize)), Some(table(status, stagesTable, stagesSize))) } -if (shouldShowSkippedStages) { - content ++= - - - -Skipped Stages ({skippedStages.size}) - - ++ - - {skippedStagesTable.toNodeSeq} - + } + + private def tableHeaderID(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "active" +case StageStatus.COMPLETE => "completed" +case StageStatus.FAILED => "failed" +case StageStatus.PENDING => "pending" +case StageStatus.SKIPPED => "skipped" + } + + private def stageTag(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "activeStage" +case StageStatus.COMPLETE => "completedStage" +case StageStatus.FAILED => "failedStage" +case StageStatus.PENDING => "pendingStage" +case StageStatus.SKIPPED => "skippedStage" + } + + private def headerDescription(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "Active" +case StageStatus.COMPLETE => "Completed" +case StageStatus.FAILED => "Failed" +case StageStatus.PENDING => "Pending" +case StageStatus.SKIPPED => "Skipped" + } + + private def classSuffix(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "ActiveStages" +case StageStatus.COMPLETE => "CompletedStages" +case StageStatus.FAILED => "FailedStages" +case StageStatus.PENDING => "PendingStages" +case StageStatus.SKIPPED => "SkippedStages" + } + + private def summaryContent(status: StageStatus, size: Int): String = { +if (status == StageStatus.COMPLETE +&& appSummary.numCompletedStages != size) { + s"${appSummary.numCompletedStages}, only showing $size" +} else { + s"$size" } -if (shouldShowFailedStages) { - content ++= - - - -Failed Stages ({numFailedStages}) - - ++ - - {failedStagesTable.toNodeSeq} - + } + + private def summary(status: StageStatus, size: Int): Elem = { +val summary = + + + {headerDescription(status)} Stages: + +{summaryContent(status, size)} + + +if (status == StageStatus.COMPLETE) { + summary % Attribute(None, "id", Text("completed-summary"), Null) --- End diff -- In the previous code this was also the case for `SKIPPED`, are you changing
[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20663#discussion_r170407811 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala --- @@ -19,46 +19,22 @@ package org.apache.spark.ui.jobs import javax.servlet.http.HttpServletRequest -import scala.xml.{Node, NodeSeq} +import scala.xml.{Attribute, Elem, Node, NodeSeq, Null, Text} import org.apache.spark.scheduler.Schedulable import org.apache.spark.status.PoolData -import org.apache.spark.status.api.v1._ +import org.apache.spark.status.api.v1.StageStatus import org.apache.spark.ui.{UIUtils, WebUIPage} /** Page showing list of all ongoing and recently finished stages and pools */ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { private val sc = parent.sc + private lazy val allStages = parent.store.stageList(null) --- End diff -- IIRC the class (`AllStagesPage`) is only instantiated once, and the `render` method is called for each request. So this won't really work. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20663: [SPARK-23501][UI] Refactor AllStagesPage in order...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/20663#discussion_r170407883 --- Diff: core/src/main/scala/org/apache/spark/ui/jobs/AllStagesPage.scala --- @@ -143,76 +72,105 @@ private[ui] class AllStagesPage(parent: StagesTab) extends WebUIPage("") { Seq.empty[Node] } } -if (shouldShowActiveStages) { - content ++= - - - -Active Stages ({activeStages.size}) - - ++ - - {activeStagesTable.toNodeSeq} - -} -if (shouldShowPendingStages) { - content ++= - - - -Pending Stages ({pendingStages.size}) - - ++ - - {pendingStagesTable.toNodeSeq} - + +tables.flatten.foreach(content ++= _) + +UIUtils.headerSparkPage("Stages for All Jobs", content, parent) + } + + def summaryAndTableForStatus( + status: StageStatus, + request: HttpServletRequest): (Option[Elem], Option[NodeSeq]) = { +val stages = if (status == StageStatus.FAILED) { + allStages.filter(_.status == status).reverse +} else { + allStages.filter(_.status == status) } -if (shouldShowCompletedStages) { - content ++= - - - -Completed Stages ({completedStageNumStr}) - - ++ - - {completedStagesTable.toNodeSeq} - + +if (stages.isEmpty) { + (None, None) +} else { + val killEnabled = status == StageStatus.ACTIVE && parent.killEnabled + val isFailedStage = status == StageStatus.FAILED + + val stagesTable = +new StageTableBase(parent.store, request, stages, tableHeaderID(status), stageTag(status), + parent.basePath, subPath, parent.isFairScheduler, killEnabled, isFailedStage) + val stagesSize = stages.size + (Some(summary(status, stagesSize)), Some(table(status, stagesTable, stagesSize))) } -if (shouldShowSkippedStages) { - content ++= - - - -Skipped Stages ({skippedStages.size}) - - ++ - - {skippedStagesTable.toNodeSeq} - + } + + private def tableHeaderID(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "active" +case StageStatus.COMPLETE => "completed" +case StageStatus.FAILED => "failed" +case StageStatus.PENDING => "pending" +case StageStatus.SKIPPED => "skipped" + } + + private def stageTag(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "activeStage" +case StageStatus.COMPLETE => "completedStage" +case StageStatus.FAILED => "failedStage" +case StageStatus.PENDING => "pendingStage" +case StageStatus.SKIPPED => "skippedStage" + } + + private def headerDescription(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "Active" +case StageStatus.COMPLETE => "Completed" +case StageStatus.FAILED => "Failed" +case StageStatus.PENDING => "Pending" +case StageStatus.SKIPPED => "Skipped" + } + + private def classSuffix(status: StageStatus): String = status match { +case StageStatus.ACTIVE => "ActiveStages" +case StageStatus.COMPLETE => "CompletedStages" +case StageStatus.FAILED => "FailedStages" +case StageStatus.PENDING => "PendingStages" +case StageStatus.SKIPPED => "SkippedStages" + } + + private def summaryContent(status: StageStatus, size: Int): String = { +if (status == StageStatus.COMPLETE +&& appSummary.numCompletedStages != size) { --- End diff -- Fits in previous line. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20622 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87637/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20622: [SPARK-23491][SS] Remove explicit job cancellation from ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20622 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20604 **[Test build #87639 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87639/testReport)** for PR 20604 at commit [`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87638/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87639/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20604 **[Test build #87639 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87639/testReport)** for PR 20604 at commit [`35314cb`](https://github.com/apache/spark/commit/35314cbd1cf999a87145c582006699a2ea261e87). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20604: [SPARK-23365][CORE] Do not adjust num executors when kil...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20604 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user viirya commented on the issue: https://github.com/apache/spark/pull/20666 cc @cloud-fan @HyukjinKwon To keep CSV reader behavior for corrupted records, we don't bother to refactoring. But we should update the document and explicitly disable partial results for corrupted records. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20666 **[Test build #87641 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87641/testReport)** for PR 20666 at commit [`4ad330b`](https://github.com/apache/spark/commit/4ad330b1def558e17dfb693d428e1bd69248e5a3). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20666 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser behavior ...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20666 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1026/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20666: [SPARK-23448][SQL] Clarify JSON and CSV parser be...
GitHub user viirya opened a pull request: https://github.com/apache/spark/pull/20666 [SPARK-23448][SQL] Clarify JSON and CSV parser behavior in document ## What changes were proposed in this pull request? Clarify JSON and CSV reader behavior in document. JSON doesn't support partial results for corrupted records. CSV only supports partial results for the records with more or less tokens. ## How was this patch tested? Pass existing tests. You can merge this pull request into a Git repository by running: $ git pull https://github.com/viirya/spark-1 SPARK-23448-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20666 commit 4ad330b1def558e17dfb693d428e1bd69248e5a3 Author: Liang-Chi HsiehDate: 2018-02-24T07:15:11Z Clarify JSON and CSV parser behavior. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r170311026 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -case relation: DataSourceV2Relation => - DataSourceV2ScanExec(relation.output, relation.reader) :: Nil +case r: DataSourceV2Relation => --- End diff -- I'm pointing it out because I think it is significant. This is also why I sent a note to the dev list. Changes like this make it much harder to work with Spark because little things change that are not necessary and cause conflicts. That's a problem not just when I'm maintaining a branch, but also when I'm trying to get a PR committed. These changes cause more work because we have to rebase PRs and update for variables that have changed names in the name of style. In this particular case, I would probably wrap the line that is too long. If you want to rename to fix it, I'd consider that reasonable. But all the other cases that aren't necessary should be reverted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20647 **[Test build #87634 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87634/testReport)** for PR 20647 at commit [`fc29f8f`](https://github.com/apache/spark/commit/fc29f8f7b5aa1bd33f5b4aa1ffd73bd9d84afc15). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20663 **[Test build #87631 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87631/testReport)** for PR 20663 at commit [`d246df2`](https://github.com/apache/spark/commit/d246df283e9a900cf114fcdf0eee2951b1bd3713). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170318693 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long length) { +super(obj, offset, length); +this.array = obj; + } + + @Override + public MemoryBlock allocate(long offset, long size) { +return new ByteArrayMemoryBlock(array, offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + + public final int getInt(long offset) { +// UTF8String.getPrefix() assumes data is 4-byte aligned +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 3) / 4) * 4); +return Platform.getInt(array, offset); + } + + public final void putInt(long offset, int value) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putInt(array, offset, value); + } + + public final boolean getBoolean(long offset) { +assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getBoolean(array, offset); + } + + public final void putBoolean(long offset, boolean value) { +assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putBoolean(array, offset, value); + } + + public final byte getByte(long offset) { +return array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)]; + } + + public final void putByte(long offset, byte value) { +array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)] = value; + } + + public final short getShort(long offset) { +assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getShort(array, offset); + } + + public final void putShort(long offset, short value) { +assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putShort(array, offset, value); + } + + public final long getLong(long offset) { +// UTF8String.getPrefix() assumes data is 8-byte aligned +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 7) / 8) * 8); +return Platform.getLong(array, offset); + } + + public final void putLong(long offset, long value) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putLong(array, offset, value); + } + + public final float getFloat(long offset) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getFloat(array, offset); + } + + public final void putFloat(long offset, float value) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putFloat(array, offset, value); + } + + public final double getDouble(long offset) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getDouble(array, offset); + } + + public final void putDouble(long offset, double value) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putDouble(array, offset, value); + } + + public final void copyFrom(byte[] src, long srcOffset, long dstOffset, long length) { --- End
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user liyinan926 commented on the issue: https://github.com/apache/spark/pull/20553 `spark.kubernetes.executor.cores` has nothing to do with dynamic resource allocation. It's just a way of letting users specify a value for the cpu resource request that conforms to Kubernetes convention and is only read/used when determining the cpu request. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...
Github user rdblue commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r170307194 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala --- @@ -35,15 +35,14 @@ case class DataSourceV2Relation( options: Map[String, String], projection: Seq[AttributeReference], filters: Option[Seq[Expression]] = None, -userSpecifiedSchema: Option[StructType] = None) extends LeafNode with MultiInstanceRelation { +userSpecifiedSchema: Option[StructType] = None) + extends LeafNode with MultiInstanceRelation with DataSourceV2QueryPlan { --- End diff -- Part of the utility of an immutable plan is to make equality work correctly. I thought that was clear, but sorry if it was not. Maybe I should have pointed it out explicitly. Equality should definitely be based on all of the inputs that affect the output rows. That includes location or table identifier, user schema, filters, requested projection, implementation, and other options. I think case class equality is correct here. > there is no consensus about how to explain a data source v2 relation, my PR tries to have people focus on this part and have a consensus. That's a good goal for this PR, so I think you should roll back the changes to the relation so equality is not affected by it. > Do you mean the path option? Yes. The data that will be scanned is an important part. As you know, I think this should be part of the relation itself and not options. That would solve this problem. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r170307903 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala --- @@ -23,11 +23,11 @@ import org.apache.spark.sql.execution.SparkPlan object DataSourceV2Strategy extends Strategy { override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { -case relation: DataSourceV2Relation => - DataSourceV2ScanExec(relation.output, relation.reader) :: Nil +case r: DataSourceV2Relation => --- End diff -- Strictly speaking, if I don't change this line, then I need to split the next line into 2 lines, as I need to pass more parameters, so it's still a 2-line diff. I think tiny things like this is really not worth to be pointed during code review, we should save our effort to focus more on the actual code logic. What do you think? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1021/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279950 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) --- End diff -- Do you have to 'cast' this to a Java Float object to get it to compile? `java.lang.Float.valueOf(0.1f)` works too I guess, but equally weird. OK if it's required. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278078 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) --- End diff -- Import `File`, other `java.*` classes? maybe I'm missing a name conflict. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170277915 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -172,57 +187,138 @@ private[spark] class KafkaRDD[K, V]( override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = { val part = thePart.asInstanceOf[KafkaRDDPartition] -assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) +require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) if (part.fromOffset == part.untilOffset) { logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + s"skipping ${part.topic} ${part.partition}") Iterator.empty } else { - new KafkaRDDIterator(part, context) + logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + +s"offsets ${part.fromOffset} -> ${part.untilOffset}") + if (compacted) { +new CompactedKafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } else { +new KafkaRDDIterator[K, V]( + part, + context, + kafkaParams, + useConsumerCache, + pollTimeout, + cacheInitialCapacity, + cacheMaxCapacity, + cacheLoadFactor +) + } } } +} - /** - * An iterator that fetches messages directly from Kafka for the offsets in partition. - * Uses a cached consumer where possible to take advantage of prefetching - */ - private class KafkaRDDIterator( - part: KafkaRDDPartition, - context: TaskContext) extends Iterator[ConsumerRecord[K, V]] { - -logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " + - s"offsets ${part.fromOffset} -> ${part.untilOffset}") - -val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] +/** + * An iterator that fetches messages directly from Kafka for the offsets in partition. + * Uses a cached consumer where possible to take advantage of prefetching + */ +private class KafkaRDDIterator[K, V]( + part: KafkaRDDPartition, + context: TaskContext, + kafkaParams: ju.Map[String, Object], + useConsumerCache: Boolean, + pollTimeout: Long, + cacheInitialCapacity: Int, + cacheMaxCapacity: Int, + cacheLoadFactor: Float +) extends Iterator[ConsumerRecord[K, V]] { + + val groupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String] + + context.addTaskCompletionListener{ context => closeIfNeeded() } --- End diff -- This could be `...(_ => closeIfNeeded())` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279150 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/KafkaRDD.scala --- @@ -87,47 +89,60 @@ private[spark] class KafkaRDD[K, V]( }.toArray } - override def count(): Long = offsetRanges.map(_.count).sum + override def count(): Long = +if (compacted) { + super.count() +} else { + offsetRanges.map(_.count).sum +} override def countApprox( timeout: Long, confidence: Double = 0.95 - ): PartialResult[BoundedDouble] = { -val c = count -new PartialResult(new BoundedDouble(c, 1.0, c, c), true) - } + ): PartialResult[BoundedDouble] = +if (compacted) { + super.countApprox(timeout, confidence) +} else { + val c = count + new PartialResult(new BoundedDouble(c, 1.0, c, c), true) +} - override def isEmpty(): Boolean = count == 0L + override def isEmpty(): Boolean = +if (compacted) { + super.isEmpty() +} else { + count == 0L +} - override def take(num: Int): Array[ConsumerRecord[K, V]] = { -val nonEmptyPartitions = this.partitions - .map(_.asInstanceOf[KafkaRDDPartition]) - .filter(_.count > 0) + override def take(num: Int): Array[ConsumerRecord[K, V]] = +if (compacted) { + super.take(num) +} else { + val nonEmptyPartitions = this.partitions +.map(_.asInstanceOf[KafkaRDDPartition]) +.filter(_.count > 0) -if (num < 1 || nonEmptyPartitions.isEmpty) { - return new Array[ConsumerRecord[K, V]](0) -} + if (num < 1 || nonEmptyPartitions.isEmpty) { --- End diff -- I guess you could check `num < 1` before the map/filter, but it's trivial. You could write `return Array.empty[ConsumerRecord[K,V]]` too; again trivial. Since this is existing code I could see not touching it as well. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170296358 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/HeapMemoryAllocator.java --- @@ -58,7 +58,8 @@ public MemoryBlock allocate(long size) throws OutOfMemoryError { final long[] array = arrayReference.get(); if (array != null) { assert (array.length * 8L >= size); - MemoryBlock memory = new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); + MemoryBlock memory = +new OnHeapMemoryBlock(array, Platform.LONG_ARRAY_OFFSET, size); --- End diff -- sure, done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20663 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87631/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20663 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20663: [SPARK-23475][UI][FOLLOWUP] Refactor AllStagesPage in or...
Github user vanzin commented on the issue: https://github.com/apache/spark/pull/20663 Could you file a separate bug for this cleanup? Thx --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20647: [SPARK-23303][SQL] improve the explain result for...
Github user marmbrus commented on a diff in the pull request: https://github.com/apache/spark/pull/20647#discussion_r170185948 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala --- @@ -77,31 +79,32 @@ class MicroBatchExecution( sparkSession.sqlContext.conf.disabledV2StreamingMicroBatchReaders.split(",") val _logicalPlan = analyzedPlan.transform { - case streamingRelation@StreamingRelation(dataSourceV1, sourceName, output) => -toExecutionRelationMap.getOrElseUpdate(streamingRelation, { + case s @ StreamingRelation(dsV1, sourceName, output) => --- End diff -- If you are touching that specific code then its fine to fix the style, but in general I tend to agree that it makes the diff harder to read and commit harder to back port if you include spurious changes. I've even seen guidelines that specifically prohibit fixing style just to fix style since it obfuscates the history. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20658 **[Test build #87627 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87627/testReport)** for PR 20658 at commit [`d7e03cd`](https://github.com/apache/spark/commit/d7e03cd507b58fda8830b580390f5224ee7c8d65). * This patch **fails Python style tests**. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20658 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87627/ Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20658 **[Test build #87627 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87627/testReport)** for PR 20658 at commit [`d7e03cd`](https://github.com/apache/spark/commit/d7e03cd507b58fda8830b580390f5224ee7c8d65). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20647 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/87624/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/20658 Merged build finished. Test FAILed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20658: [SPARK-23488][python] Add missing catalog methods to pyt...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20658 ok to test --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20647: [SPARK-23303][SQL] improve the explain result for data s...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/20647 **[Test build #87624 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87624/testReport)** for PR 20647 at commit [`dbee281`](https://github.com/apache/spark/commit/dbee2813accd4c8f5937b28eb9142cc6a50f8c6a). * This patch passes all tests. * This patch merges cleanly. * This patch adds no public classes. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20648: [SPARK-23448][SQL] JSON parser should return partial row...
Github user HyukjinKwon commented on the issue: https://github.com/apache/spark/pull/20648 +1 for disallowing it anyway if it was Wenchen's opinion too. Please go ahead. Will help double check anyway. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/testing-k8s-prb-make-spark-distribution/1020/ Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170296478 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -45,38 +44,149 @@ */ public static final int FREED_IN_ALLOCATOR_PAGE_NUMBER = -3; - private final long length; + @Nullable + protected Object obj; + + protected long offset; + + protected long length; /** * Optional page number; used when this MemoryBlock represents a page allocated by a - * TaskMemoryManager. This field is public so that it can be modified by the TaskMemoryManager, - * which lives in a different package. + * TaskMemoryManager. This field can be updated using setPageNumber method so that + * this can be modified by the TaskMemoryManager, which lives in a different package. */ - public int pageNumber = NO_PAGE_NUMBER; + private int pageNumber = NO_PAGE_NUMBER; public MemoryBlock(@Nullable Object obj, long offset, long length) { -super(obj, offset); +this.obj = obj; +this.offset = offset; this.length = length; } + public MemoryBlock() { +this(null, 0, 0); + } + + public final Object getBaseObject() { +return obj; + } + + public final long getBaseOffset() { +return offset; + } + + public void resetObjAndOffset() { +this.obj = null; +this.offset = 0; + } + /** * Returns the size of the memory block. */ - public long size() { + public final long size() { return length; } - /** - * Creates a memory block pointing to the memory used by the long array. - */ - public static MemoryBlock fromLongArray(final long[] array) { -return new MemoryBlock(array, Platform.LONG_ARRAY_OFFSET, array.length * 8L); + public final void setPageNumber(int pageNum) { +pageNumber = pageNum; + } + + public final int getPageNumber() { +return pageNumber; } /** * Fills the memory block with the specified byte value. */ - public void fill(byte value) { + public final void fill(byte value) { Platform.setMemory(obj, offset, length, value); } + + /** + * Instantiate MemoryBlock for given object type with new offset + */ + public final static MemoryBlock allocateFromObject(Object obj, long offset, long length) { +MemoryBlock mb = null; +if (obj instanceof byte[]) { + byte[] array = (byte[])obj; + mb = new ByteArrayMemoryBlock(array, offset, length); +} else if (obj instanceof long[]) { + long[] array = (long[])obj; + mb = new OnHeapMemoryBlock(array, offset, length); +} else if (obj == null) { + // we assume that to pass null pointer means off-heap + mb = new OffHeapMemoryBlock(offset, length); +} else { + throw new UnsupportedOperationException(obj.getClass() + " is not supported now"); +} +return mb; + } + + /** + * Instantiate the same type of MemoryBlock with new offset and size + */ + public abstract MemoryBlock allocate(long offset, long size); + + + public abstract int getInt(long offset); + + public abstract void putInt(long offset, int value); + + public abstract boolean getBoolean(long offset); + + public abstract void putBoolean(long offset, boolean value); + + public abstract byte getByte(long offset); + + public abstract void putByte(long offset, byte value); + + public abstract short getShort(long offset); + + public abstract void putShort(long offset, short value); + + public abstract long getLong(long offset); + + public abstract void putLong(long offset, long value); + + public abstract float getFloat(long offset); + + public abstract void putFloat(long offset, float value); + + public abstract double getDouble(long offset); + + public abstract void putDouble(long offset, double value); + + public abstract Object getObjectVolatile(long offset); --- End diff -- Yeah, dropped --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user kiszk commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170296403 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/MemoryBlock.java --- @@ -22,10 +22,9 @@ import org.apache.spark.unsafe.Platform; /** - * A consecutive block of memory, starting at a {@link MemoryLocation} with a fixed size. + * A declaration of interfaces of MemoryBlock classes . --- End diff -- Thanks, done --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user SparkQA commented on the issue: https://github.com/apache/spark/pull/19222 **[Test build #87633 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/87633/testReport)** for PR 19222 at commit [`5e3afd1`](https://github.com/apache/spark/commit/5e3afd11a2dc76d2cd23264b4052abbf3f5d7e9d). --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #19222: [SPARK-10399][CORE][SQL] Introduce multiple MemoryBlocks...
Github user AmplabJenkins commented on the issue: https://github.com/apache/spark/pull/19222 Merged build finished. Test PASSed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20553 also cc @cloud-fan @jerryshao --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20664: [SPARK-23496][CORE] Locality of coalesced partiti...
Github user mgaido91 commented on a diff in the pull request: https://github.com/apache/spark/pull/20664#discussion_r170279656 --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala --- @@ -1129,6 +1129,36 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext { }.collect() } + test("SPARK-23496: order of input partitions can result in severe skew in coalesce") { --- End diff -- I see, thanks, sorry, I missed it --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #19222: [SPARK-10399][CORE][SQL] Introduce multiple Memor...
Github user cloud-fan commented on a diff in the pull request: https://github.com/apache/spark/pull/19222#discussion_r170305944 --- Diff: common/unsafe/src/main/java/org/apache/spark/unsafe/memory/ByteArrayMemoryBlock.java --- @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.unsafe.memory; + +import org.apache.spark.unsafe.Platform; + +/** + * A consecutive block of memory with a byte array on Java heap. + */ +public final class ByteArrayMemoryBlock extends MemoryBlock { + + private final byte[] array; + + public ByteArrayMemoryBlock(byte[] obj, long offset, long length) { +super(obj, offset, length); +this.array = obj; + } + + @Override + public MemoryBlock allocate(long offset, long size) { +return new ByteArrayMemoryBlock(array, offset, size); + } + + public byte[] getByteArray() { return array; } + + /** + * Creates a memory block pointing to the memory used by the byte array. + */ + public static ByteArrayMemoryBlock fromArray(final byte[] array) { +return new ByteArrayMemoryBlock(array, Platform.BYTE_ARRAY_OFFSET, array.length); + } + + + public final int getInt(long offset) { +// UTF8String.getPrefix() assumes data is 4-byte aligned +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 3) / 4) * 4); +return Platform.getInt(array, offset); + } + + public final void putInt(long offset, int value) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putInt(array, offset, value); + } + + public final boolean getBoolean(long offset) { +assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getBoolean(array, offset); + } + + public final void putBoolean(long offset, boolean value) { +assert(offset + 1 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putBoolean(array, offset, value); + } + + public final byte getByte(long offset) { +return array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)]; + } + + public final void putByte(long offset, byte value) { +array[(int)(offset - Platform.BYTE_ARRAY_OFFSET)] = value; + } + + public final short getShort(long offset) { +assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getShort(array, offset); + } + + public final void putShort(long offset, short value) { +assert(offset + 2 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putShort(array, offset, value); + } + + public final long getLong(long offset) { +// UTF8String.getPrefix() assumes data is 8-byte aligned +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= ((array.length + 7) / 8) * 8); +return Platform.getLong(array, offset); + } + + public final void putLong(long offset, long value) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putLong(array, offset, value); + } + + public final float getFloat(long offset) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getFloat(array, offset); + } + + public final void putFloat(long offset, float value) { +assert(offset + 4 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putFloat(array, offset, value); + } + + public final double getDouble(long offset) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +return Platform.getDouble(array, offset); + } + + public final void putDouble(long offset, double value) { +assert(offset + 8 - Platform.BYTE_ARRAY_OFFSET <= array.length); +Platform.putDouble(array, offset, value); + } + + public final void copyFrom(byte[] src, long srcOffset, long dstOffset, long length) { --- End
[GitHub] spark issue #20553: [SPARK-23285][K8S] Add a config property for specifying ...
Github user jiangxb1987 commented on the issue: https://github.com/apache/spark/pull/20553 IIUC the `spark.kubernetes.executor.cores` here is just a special case for `spark.executor.cores`, for k8s backend, you shall still have to handle float values if you're to read the value of `spark.kubernetes.executor.cores`, for instance, in dynamic allocation. If that is the case here, I don't see much benefit of bringing in a new conf here. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278317 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala --- @@ -162,17 +162,22 @@ private[kafka010] class KafkaTestUtils extends Logging { } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ - def createTopic(topic: String, partitions: Int): Unit = { -AdminUtils.createTopic(zkUtils, topic, partitions, 1) + def createTopic(topic: String, partitions: Int, config: Properties): Unit = { +AdminUtils.createTopic(zkUtils, topic, partitions, 1, config) // wait until metadata is propagated (0 until partitions).foreach { p => waitUntilMetadataIsPropagated(topic, p) } } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ + def createTopic(topic: String, partitions: Int): Unit = { +createTopic(topic, partitions, new Properties) + } + /** Create a Kafka topic and wait until it is propagated to the whole cluster */ def createTopic(topic: String): Unit = { -createTopic(topic, 1) +createTopic(topic, 1, new Properties) --- End diff -- Nit: `new Properties()` --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170279504 --- Diff: external/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaRDDSuite.scala --- @@ -64,6 +69,41 @@ class KafkaRDDSuite extends SparkFunSuite with BeforeAndAfterAll { private val preferredHosts = LocationStrategies.PreferConsistent + private def compactLogs(topic: String, partition: Int, messages: Array[(String, String)]) { +val mockTime = new MockTime() +// LogCleaner in 0.10 version of Kafka is still expecting the old TopicAndPartition api +val logs = new Pool[TopicAndPartition, Log]() +val logDir = kafkaTestUtils.brokerLogDir +val dir = new java.io.File(logDir, topic + "-" + partition) +dir.mkdirs() +val logProps = new ju.Properties() +logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact) +logProps.put(LogConfig.MinCleanableDirtyRatioProp, 0.1f: java.lang.Float) +val log = new Log( + dir, + LogConfig(logProps), + 0L, + mockTime.scheduler, + mockTime +) +messages.foreach { case (k, v) => +val msg = new ByteBufferMessageSet( --- End diff -- Unindent one level? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20572: [SPARK-17147][STREAMING][KAFKA] Allow non-consecu...
Github user srowen commented on a diff in the pull request: https://github.com/apache/spark/pull/20572#discussion_r170278931 --- Diff: external/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/CachedKafkaConsumer.scala --- @@ -71,25 +69,62 @@ class CachedKafkaConsumer[K, V] private( } if (!buffer.hasNext()) { poll(timeout) } -assert(buffer.hasNext(), +require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") var record = buffer.next() if (record.offset != offset) { logInfo(s"Buffer miss for $groupId $topic $partition $offset") seek(offset) poll(timeout) - assert(buffer.hasNext(), + require(buffer.hasNext(), s"Failed to get records for $groupId $topic $partition $offset after polling for $timeout") record = buffer.next() - assert(record.offset == offset, -s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset") + require(record.offset == offset, +s"Got wrong record for $groupId $topic $partition even after seeking to offset $offset " + + s"got offset ${record.offset} instead. If this is a compacted topic, consider enabling " + + "spark.streaming.kafka.allowNonConsecutiveOffsets" + ) } nextOffset = offset + 1 record } + /** + * Start a batch on a compacted topic + */ + def compactedStart(offset: Long, timeout: Long): Unit = { +logDebug(s"compacted start $groupId $topic $partition starting $offset") +// This seek may not be necessary, but it's hard to tell due to gaps in compacted topics +if (offset != nextOffset) { + logInfo(s"Initial fetch for compacted $groupId $topic $partition $offset") + seek(offset) + poll(timeout) +} + } + + /** + * Get the next record in the batch from a compacted topic. + * Assumes compactedStart has been called first, and ignores gaps. + */ + def compactedNext(timeout: Long): ConsumerRecord[K, V] = { +if (!buffer.hasNext()) { poll(timeout) } --- End diff -- Nit: I'd expand this onto two lines --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org