[GitHub] spark pull request: SPARK-3874: Provide stable TaskContext API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2803#issuecomment-59161674 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/382/consoleFull) for PR 2803 at commit [`56d5b7a`](https://github.com/apache/spark/commit/56d5b7a703afb2529d969ffa0664f6c601186fb0). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875997 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision + * @return the average precision at the first k ranking positions + */ + def precision(k: Int): Double = precAtK.map {topKPrec = --- End diff -- If `k` is given, we only need to compute `topKPrec(k-1)` for each record. We can do 1. compute on-demand, every call requires a pass. 2. compute average `prec@k` for all possible k and cache the result. Then simply return the cached result. The first approach may be good for this PR. We can optimize it later. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18876000 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision + * @return the average precision at the first k ranking positions + */ + def precision(k: Int): Double = precAtK.map {topKPrec = +val n = topKPrec.length +if (k = n) { + topKPrec(k - 1) +} else { + topKPrec(n - 1) * n / k +} + }.mean + + /** + * Returns the average precision for each query + */ + lazy val avePrec: RDD[Double] = predictionAndLabels.map {case (pred, lab) = +val labSet = lab.toSet +var (i, cnt, precSum) = (0, 0, 0.0) +val n = pred.length + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 +precSum += cnt.toDouble / (i + 1) + } + i += 1 +} +precSum / labSet.size + } + + /** + * Returns the mean average precision (MAP) of all the queries + */ + lazy val meanAvePrec: Double = avePrec.mean --- End diff -- `meanAvePrec` is not a common acronym for MAP. Let's use `meanAveragePrecision`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875991 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics(predictionAndLabels: RDD[(Array[Double], Array[Double])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= --- End diff -- Should it be private? Think about what users call it for. Even we make it public, users may still need some aggregate metrics out of it. For the first version, I think it is safe to provide only the following: 1. precisionAt(k: Int): Double 2. ndcgAt(k: Int): Double 3. meanAveragePrecision `{case (pred, lab)=` - `{ case (pred, lab) =`` (spaces after `{` and `)`) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875998 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision + * @return the average precision at the first k ranking positions + */ + def precision(k: Int): Double = precAtK.map {topKPrec = +val n = topKPrec.length +if (k = n) { + topKPrec(k - 1) +} else { + topKPrec(n - 1) * n / k +} + }.mean + + /** + * Returns the average precision for each query + */ + lazy val avePrec: RDD[Double] = predictionAndLabels.map {case (pred, lab) = --- End diff -- Same as `precAtK`. We should make it private or put it inside `meanAveragePrecision` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875996 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision --- End diff -- Need a one sentence summary about the method. Also need a link to the paper that describes how to handle the case when we don't have k items. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875995 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) --- End diff -- ~~~ var i = 0 var cnt = 0 ~~~ --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875993 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) --- End diff -- new Array[Double](n) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18875990 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + --- End diff -- remove extra empty lines --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18876002 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision + * @return the average precision at the first k ranking positions + */ + def precision(k: Int): Double = precAtK.map {topKPrec = +val n = topKPrec.length +if (k = n) { + topKPrec(k - 1) +} else { + topKPrec(n - 1) * n / k +} + }.mean + + /** + * Returns the average precision for each query + */ + lazy val avePrec: RDD[Double] = predictionAndLabels.map {case (pred, lab) = +val labSet = lab.toSet +var (i, cnt, precSum) = (0, 0, 0.0) +val n = pred.length + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 +precSum += cnt.toDouble / (i + 1) + } + i += 1 +} +precSum / labSet.size + } + + /** + * Returns the mean average precision (MAP) of all the queries + */ + lazy val meanAvePrec: Double = avePrec.mean + + /** + * Returns the normalized discounted cumulative gain for each query + */ + lazy val ndcgAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab) = --- End diff -- Ditto. Make it private or put it inside `ndcgAt(k)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18876004 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + +import scala.reflect.ClassTag + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics[T: ClassTag](predictionAndLabels: RDD[(Array[T], Array[T])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= +val labSet = lab.toSet +val n = pred.length +val topKPrec = Array.fill[Double](n)(0.0) +var (i, cnt) = (0, 0) + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 + } + topKPrec(i) = cnt.toDouble / (i + 1) + i += 1 +} +topKPrec + } + + /** + * @param k the position to compute the truncated precision + * @return the average precision at the first k ranking positions + */ + def precision(k: Int): Double = precAtK.map {topKPrec = +val n = topKPrec.length +if (k = n) { + topKPrec(k - 1) +} else { + topKPrec(n - 1) * n / k +} + }.mean + + /** + * Returns the average precision for each query + */ + lazy val avePrec: RDD[Double] = predictionAndLabels.map {case (pred, lab) = +val labSet = lab.toSet +var (i, cnt, precSum) = (0, 0, 0.0) +val n = pred.length + +while (i n) { + if (labSet.contains(pred(i))) { +cnt += 1 +precSum += cnt.toDouble / (i + 1) + } + i += 1 +} +precSum / labSet.size + } + + /** + * Returns the mean average precision (MAP) of all the queries + */ + lazy val meanAvePrec: Double = avePrec.mean + + /** + * Returns the normalized discounted cumulative gain for each query + */ + lazy val ndcgAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab) = +val labSet = lab.toSet +val labSetSize = labSet.size +val n = math.max(pred.length, labSetSize) +val topKNdcg = Array.fill[Double](n)(0.0) +var (maxDcg, dcg, i) = (0.0, 0.0, 0) +while (i n) { + /* Calculate 1/log2(i + 2) */ + val gain = math.log(2) / math.log(i + 2) + if (labSet.contains(pred(i))) { +dcg += gain + } + if (i labSetSize) { +maxDcg += gain + } + topKNdcg(i) = dcg / maxDcg + i += 1 +} +topKNdcg + } + + /** + * @param k the position to compute the truncated ndcg --- End diff -- Need a one sentence summary about the method. Also need a link to the paper that describes how to handle the case when we don't have k items. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3568 [mllib] add ranking metrics
Github user mengxr commented on a diff in the pull request: https://github.com/apache/spark/pull/2667#discussion_r18876036 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/evaluation/RankingMetrics.scala --- @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.evaluation + + +import org.apache.spark.SparkContext._ +import org.apache.spark.annotation.Experimental +import org.apache.spark.rdd.RDD + + +/** + * ::Experimental:: + * Evaluator for ranking algorithms. + * + * @param predictionAndLabels an RDD of (predicted ranking, ground truth set) pairs. + */ +@Experimental +class RankingMetrics(predictionAndLabels: RDD[(Array[Double], Array[Double])]) { + + /** + * Returns the precsion@k for each query + */ + lazy val precAtK: RDD[Array[Double]] = predictionAndLabels.map {case (pred, lab)= --- End diff -- btw, I think it is better to use `precisionAt` instead of `precision` to emphasize that the parameter is a position. We use `precision(class)` in `MulticlassMetrics`, and users may get confused if we use the same name. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3874: Provide stable TaskContext API
Github user ScrapCodes commented on a diff in the pull request: https://github.com/apache/spark/pull/2803#discussion_r18876515 --- Diff: project/MimaExcludes.scala --- @@ -50,7 +50,11 @@ object MimaExcludes { org.apache.spark.mllib.stat.MultivariateStatisticalSummary.normL2), // MapStatus should be private[spark] ProblemFilters.exclude[IncompatibleTemplateDefProblem]( - org.apache.spark.scheduler.MapStatus) + org.apache.spark.scheduler.MapStatus), +// TaskContext was promoted to Abstract class +ProblemFilters.exclude[AbstractClassProblem]( + org.apache.spark.TaskContext) --- End diff -- I felt we could exclude this generally, it did not made sense to me to report these errors for classes marked developerApi or experimental. Thoughts ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3923] Decrease Akka heartbeat interval ...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/2784#issuecomment-59163637 Thanks, LGTM. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3923] Decrease Akka heartbeat interval ...
Github user ScrapCodes commented on the pull request: https://github.com/apache/spark/pull/2784#issuecomment-59163981 Minor: Your PR title looks misleading ! :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3944][Core] Code re-factored as suggest...
GitHub user Shiti opened a pull request: https://github.com/apache/spark/pull/2810 [SPARK-3944][Core] Code re-factored as suggested You can merge this pull request into a Git repository by running: $ git pull https://github.com/Shiti/spark master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2810 commit 8ee1cc91e1f927b86193ba2e3b2caf64081061d2 Author: shitis ssaxena@gmail.com Date: 2014-10-15T06:37:17Z code refactor --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3944][Core] Code re-factored as suggest...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2810#issuecomment-59164110 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user viper-kun commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59164199 @vanzin , is it ok to go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3569][SQL] Add metadata field to Struct...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2701#issuecomment-59165099 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21762/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3569][SQL] Add metadata field to Struct...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2701#issuecomment-59165094 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21762/consoleFull) for PR 2701 at commit [`3f49aab`](https://github.com/apache/spark/commit/3f49aab1342fd2d877749c7cce5cfa6bc8ac1fa7). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `case class AttributeReference(` * `case class StructField(` * `class MetadataBuilder ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/spark/pull/2762#discussion_r18877380 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/hiveUdfs.scala --- @@ -159,7 +162,10 @@ private[hive] case class HiveGenericUdf(functionClassName: String, children: Seq var i = 0 while (i children.length) { val idx = i - deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set(() = {children(idx).eval(input)}) + deferedObjects(i).asInstanceOf[DeferredObjectAdapter].set( +() = { + children(idx).eval(input) +}, argumentInspectors(i)) --- End diff -- argumentInspector can be set once during deferedObjects creation, rather than execution per row. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3874: Provide stable TaskContext API
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2803#issuecomment-59166771 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/382/consoleFull) for PR 2803 at commit [`56d5b7a`](https://github.com/apache/spark/commit/56d5b7a703afb2529d969ffa0664f6c601186fb0). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds the following public classes _(experimental)_: * `public abstract class TaskContext implements Serializable ` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: promote the speed of convert files to RDDS
GitHub user surq opened a pull request: https://github.com/apache/spark/pull/2811 promote the speed of convert files to RDDS about convert files to RDDS there are 3 loops with files sequence in spark source. loops files sequence: 1ãfiles.map(...) 2ãfiles.zip(fileRDDs) 3ãfiles-size.foreach It's will very time consuming when lots of files.So I do the following correction: 3 loops with files sequence = only one loop You can merge this pull request into a Git repository by running: $ git pull https://github.com/surq/spark SPARK-3954 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2811.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2811 commit 739341f543517c15d92219a239ab156a1adcb469 Author: surq s...@asiainfo.com Date: 2014-10-15T07:16:40Z promote the speed of convert files to RDDS --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: promote the speed of convert files to RDDS
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-59167172 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user gvramana commented on a diff in the pull request: https://github.com/apache/spark/pull/2762#discussion_r18877950 --- Diff: sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala --- @@ -186,6 +230,51 @@ private[hive] trait HiveInspectors { fields.map(f = f.name), fields.map(f = toInspector(f.dataType))) } + def toInspector(expr: Expression): ObjectInspector = expr match { +case Literal(value: String, StringType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.STRING, new hadoopIo.Text(value)) +case Literal(value: Int, IntegerType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.INT, new hadoopIo.IntWritable(value)) +case Literal(value: Double, DoubleType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.DOUBLE, new hiveIo.DoubleWritable(value)) +case Literal(value: Boolean, BooleanType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.BOOLEAN, new hadoopIo.BooleanWritable(value)) +case Literal(value: Long, LongType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.LONG, new hadoopIo.LongWritable(value)) +case Literal(value: Float, FloatType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.FLOAT, new hadoopIo.FloatWritable(value)) +case Literal(value: Short, ShortType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.SHORT, new hiveIo.ShortWritable(value)) +case Literal(value: Byte, ByteType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.BYTE, new hiveIo.ByteWritable(value)) +case Literal(value: Array[Byte], BinaryType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.BINARY, new hadoopIo.BytesWritable(value)) +case Literal(value: java.sql.Date, DateType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.DATE, new hiveIo.DateWritable(value)) +case Literal(value: java.sql.Timestamp, TimestampType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.TIMESTAMP, new hiveIo.TimestampWritable(value)) +case Literal(value: BigDecimal, DecimalType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.DECIMAL, +new hiveIo.HiveDecimalWritable(new HiveDecimal(value.underlying( +case Literal(_, NullType) = + PrimitiveObjectInspectorFactory.getPrimitiveWritableConstantObjectInspector( +PrimitiveCategory.VOID, null) +case Literal(_, _) = sys.error(Hive doesn't support the constant complicated type.) --- End diff -- Hive supports few complicated types constant. List, Map StandardConstantListObjectInspector.java StandardConstantMapObjectInspector.java This is required for functions like percentile_approx(col, array(0.8,0.9)) to work --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3904] [SQL] add constant objectinspecto...
Github user gvramana commented on the pull request: https://github.com/apache/spark/pull/2762#issuecomment-59168016 unfortunately :( I also have worked and implemented the same as part of https://github.com/apache/spark/pull/2802 Anyways I will rework on my pull request for supporting UDAF and GenericUDAF, once your changes are merged. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59174952 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21763/consoleFull) for PR 2805 at commit [`ae1b36f`](https://github.com/apache/spark/commit/ae1b36f57ac876467b23dcc003cc81a8c6881341). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59175385 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21763/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59175382 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21763/consoleFull) for PR 2805 at commit [`ae1b36f`](https://github.com/apache/spark/commit/ae1b36f57ac876467b23dcc003cc81a8c6881341). * This patch **fails to build**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: promote the speed of convert files to RDDS
Github user jerryshao commented on a diff in the pull request: https://github.com/apache/spark/pull/2811#discussion_r18881912 --- Diff: streaming/src/main/scala/org/apache/spark/streaming/dstream/FileInputDStream.scala --- @@ -27,6 +27,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.rdd.UnionRDD import org.apache.spark.streaming.{StreamingContext, Time} import org.apache.spark.util.TimeStampedHashMap +import scala.collection.mutable.ArrayBuffer --- End diff -- Does your modification use ArrayBuffer, seems it is not used. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: promote the speed of convert files to RDDS
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-59176947 Looks good to me about the improvement. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: promote the speed of convert files to RDDS
Github user jerryshao commented on the pull request: https://github.com/apache/spark/pull/2811#issuecomment-59177082 Besides would you mind creating a related JIRA and change the title like other PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3945]Properties of hive-site.xml is inv...
Github user luogankun commented on the pull request: https://github.com/apache/spark/pull/2800#issuecomment-59183382 @liancheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
GitHub user adrian-wang opened a pull request: https://github.com/apache/spark/pull/2812 [SPARK-3939][SQL] NPE caused by SessionState.out not set in thriftserver2 There would be an NPE caused by SessionState.out not set in thriftserver2. You can merge this pull request into a Git repository by running: $ git pull https://github.com/adrian-wang/spark setcommand Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2812 commit e89a7c26fd095a1aea380860e0a45f80d5fcbd6b Author: Daoyuan Wang daoyuan.w...@intel.com Date: 2014-10-15T10:22:41Z initiate err and out in HiveThriftServer --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2812#issuecomment-59186353 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21764/consoleFull) for PR 2812 at commit [`e89a7c2`](https://github.com/apache/spark/commit/e89a7c26fd095a1aea380860e0a45f80d5fcbd6b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2812#issuecomment-59189745 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21764/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2812#issuecomment-59189741 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21764/consoleFull) for PR 2812 at commit [`e89a7c2`](https://github.com/apache/spark/commit/e89a7c26fd095a1aea380860e0a45f80d5fcbd6b). * This patch **fails Spark unit tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user mattf commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59191797 Well, good luck with adding something like that to HDFS... that is not the responsibility of filesystems. just so we're on the same page, i'm not advocating adding this functionality to HDFS. i believe it should be separate functionality that doesn't live in a spark process, as it's an operational activity. a directory that is completely managed within Spark (the event log directory) shouldn't also be cleaned up by Spark we can agree to disagree. you can view a trace log as allocating some amount of disk space that you then have to manage, similar to memory allocations in a program. however, doing that management is more involved than periodically rotating. if you're in nyc for strata we should grab a beer and debate the finer points of resource management. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3223 runAsSparkUser cannot change HDFS w...
Github user timothysc commented on the pull request: https://github.com/apache/spark/pull/2126#issuecomment-59201681 I believe @tgravescs is the only committer on this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3223 runAsSparkUser cannot change HDFS w...
Github user tgravescs commented on the pull request: https://github.com/apache/spark/pull/2126#issuecomment-59214378 @pwendell @mateiz Any committers that are more familiar with the mesos stuff that could look at this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: improve spark on yarn doc(issus 3629 on jira)
GitHub user ssjssh opened a pull request: https://github.com/apache/spark/pull/2813 improve spark on yarn doc(issus 3629 on jira) Right now running-on-yarn.md starts off with a big list of config options, and only then tells you how to submit an app. It would be better to put that part and the packaging part first, and the config options only at the end. Besides I add some explainations about yarn-cluster and yarn-client mode https://issues.apache.org/jira/browse/SPARK-3629 You can merge this pull request into a Git repository by running: $ git pull https://github.com/ssjssh/spark improve-spark-on-yarn-doc Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2813.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2813 commit f8ec44d3304c2c28c176257e6a75102c655ee71d Author: ssjssh ssj970763...@163.com Date: 2014-10-08T15:30:13Z use test to explore spark commit 25a3c127cff84a61cf31ecfdefed9b83cc18dc19 Author: ssjssh ssj970763...@163.com Date: 2014-10-08T15:38:11Z add first test commit c86112f82e5cdca7629906dd2846d48149183556 Author: ssjssh ssj970763...@163.com Date: 2014-10-13T06:14:36Z å ownæ件夹 commit 92e28e1dcc08db2670b83cba5d3fae4e588644f6 Author: ssjssh ssj970763...@163.com Date: 2014-10-15T14:25:08Z edited to tell how to submit Spark application on yarn first and then follows the config options when works with yarn. In addition, add some explainations about yarn-cluster mode and yarn-client mode. commit 505a155f4cd52d641a4f87822f5bfcf205a58baf Author: ssjssh ssj970763...@163.com Date: 2014-10-15T14:29:03Z delete test file --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: improve spark on yarn doc(issus 3629 on jira)
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2813#issuecomment-59215162 Can one of the admins verify this patch? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3944][Core] Code re-factored as suggest...
Github user markhamstra commented on the pull request: https://github.com/apache/spark/pull/2810#issuecomment-59223588 ok to test --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoopUtil should not u...
GitHub user srowen opened a pull request: https://github.com/apache/spark/pull/2814 SPARK-1209 [CORE] SparkHadoopUtil should not use package org.apache.hadoop (This is just a look at what completely moving the classes would look like. I know Patrick flagged that as maybe not OK, although, it's private?) You can merge this pull request into a Git repository by running: $ git pull https://github.com/srowen/spark SPARK-1209 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2814.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2814 commit b5b82e28724f94573644a68a794ce33649ea6ea3 Author: Sean Owen so...@cloudera.com Date: 2014-10-13T19:43:36Z Refer to self-contained rather than standalone apps to avoid confusion with standalone deployment mode. And fix placement of reference to this in MLlib docs. commit ec1b84a331d2cfe1a9b2dc2553a117026728bb01 Author: Sean Owen so...@cloudera.com Date: 2014-10-13T19:48:42Z Move SparkHadoopMapRedUtil / SparkHadoopMapReduceUtil from org.apache.hadoop to org.apache.spark --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59226192 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21766/consoleFull) for PR 2805 at commit [`f4717f9`](https://github.com/apache/spark/commit/f4717f972f55dd43b4c84e8c9d3e21ff163e8b9c). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoopUtil should not u...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59226216 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21765/consoleFull) for PR 2814 at commit [`ec1b84a`](https://github.com/apache/spark/commit/ec1b84a331d2cfe1a9b2dc2553a117026728bb01). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59231360 @mattf Always in for a beer but unfortunately I'm not in NY... Also, you mention rotating a lot. This is not rotating. This is cleaning up, as in deleting existing logs. Having an external process do that for you complicates the History Server because now there's the possibility that it will serve stale data - links to jobs that have been cleaned up by that external process, and will result in an error in the UI. (Which is why I mentioned above that an external sweeper without inotify is a no-go.) Also, this is not a trace log. This is application history information. This is not your Linux syslog or Windows event log. This is very application-specific. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3562]Periodic cleanup event logs
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2471#issuecomment-59231417 @viper-kun haven't had a chance to look at the diff again, but it seems there are merge conflicts now. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3944][Core] Code re-factored as suggest...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2810#issuecomment-59235424 Hi @Shiti , could you write a less cryptic PR title? This becomes the git commit summary, so it's recommended that it describe the actual change. Thanks! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59237336 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21767/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59237316 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21766/consoleFull) for PR 2805 at commit [`f4717f9`](https://github.com/apache/spark/commit/f4717f972f55dd43b4c84e8c9d3e21ff163e8b9c). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59237321 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21766/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3923] Increase Akka heartbeat pause abo...
Github user aarondav commented on the pull request: https://github.com/apache/spark/pull/2784#issuecomment-59237464 Updated, but I think we should always give PRs a name opposite to what they actually do. Keeps things interesting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3923] Increase Akka heartbeat pause abo...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2784#issuecomment-59237808 above below? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user shaneknapp commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59239541 jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user srowen commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59239623 Jenkins, test this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59241056 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21769/consoleFull) for PR 2805 at commit [`ecb78ee`](https://github.com/apache/spark/commit/ecb78eef6bd00d1d13206fafb6e0d8b4c565ebf3). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59241716 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21765/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59241697 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21765/consoleFull) for PR 2814 at commit [`ec1b84a`](https://github.com/apache/spark/commit/ec1b84a331d2cfe1a9b2dc2553a117026728bb01). * This patch **fails MiMa tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3795] Heuristics for dynamically s...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2746#discussion_r18908858 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ExecutorAllocationManager.scala --- @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on the number of pending tasks. If the queue of pending tasks is not + * drained in N seconds, then new executors are added. If the queue persists for another M + * seconds, then more executors are added and so on. The number added in each round increases + * exponentially from the previous round until an upper bound on the number of executors has + * been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds (meaning it has not + * been scheduled to run any tasks), then it is removed. This requires starting a timer on each + * executor instead of just starting a global one as in the add case. + * + * Both add and remove attempts are retried on failure up to a maximum number of times. + * + * The relevant Spark properties include the following: + * + * spark.dynamicAllocation.enabled - Whether this feature is enabled + * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors + * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * + * spark.dynamicAllocation.addExecutorThreshold - How long before new executors are added (N) + * spark.dynamicAllocation.addExecutorInterval - How often to add new executors (M) + * spark.dynamicAllocation.removeExecutorThreshold - How long before an executor is removed (K) + * + * spark.dynamicAllocation.addExecutorRetryInterval - How often to retry adding executors --- End diff -- A lot of the logic in here seems related to retries. Why do we have to deal with retrying at all? What if we just do individual executor requests one at a time and we don't do any dynamic scaling until we get back the result of the last request? Could this simplify a lot of complexity here? My understanding is that YARN etc should be able to launch executors in order seconds, so we could add ten or more executors per minute... that seems pretty reasonable for the type of workloads this is targeting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18908910 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909075 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -33,30 +34,25 @@ import org.apache.spark.util.Utils * a layer over the different cluster managers and deploy modes that Spark supports. */ object SparkSubmit { - - // Cluster managers - private val YARN = 1 - private val STANDALONE = 2 - private val MESOS = 4 - private val LOCAL = 8 - private val ALL_CLUSTER_MGRS = YARN | STANDALONE | MESOS | LOCAL - - // Deploy modes - private val CLIENT = 1 - private val CLUSTER = 2 - private val ALL_DEPLOY_MODES = CLIENT | CLUSTER - // A special jar name that indicates the class being run is inside of Spark itself, and therefore // no user jar is needed. - private val SPARK_INTERNAL = spark-internal + val SPARK_INTERNAL = spark-internal // Special primary resource names that represent shells rather than application jars. - private val SPARK_SHELL = spark-shell - private val PYSPARK_SHELL = pyspark-shell + val SPARK_SHELL = spark-shell + val PYSPARK_SHELL = pyspark-shell + + // Special python classes + val PY4J_GATEWAYSERVER: String = py4j.GatewayServer + val PYTHON_RUNNER: String = org.apache.spark.deploy.PythonRunner --- End diff -- Instead, I'd use `PythonRunner.getClass.getName.stripSuffix($)`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909183 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909253 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909359 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909554 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18909952 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -83,216 +79,163 @@ object SparkSubmit { * (4) the main class for the child */ private[spark] def createLaunchEnv(args: SparkSubmitArguments) - : (ArrayBuffer[String], ArrayBuffer[String], Map[String, String], String) = { + : (mutable.ArrayBuffer[String], mutable.ArrayBuffer[String], Map[String, String], String) = { // Values to return -val childArgs = new ArrayBuffer[String]() -val childClasspath = new ArrayBuffer[String]() -val sysProps = new HashMap[String, String]() +val childArgs = new mutable.ArrayBuffer[String]() +val childClasspath = new mutable.ArrayBuffer[String]() +val sysProps = new mutable.HashMap[String, String]() var childMainClass = -// Set the cluster manager -val clusterManager: Int = args.master match { - case m if m.startsWith(yarn) = YARN - case m if m.startsWith(spark) = STANDALONE - case m if m.startsWith(mesos) = MESOS - case m if m.startsWith(local) = LOCAL - case _ = printErrorAndExit(Master must start with yarn, spark, mesos, or local); -1 -} - -// Set the deploy mode; default is client mode -var deployMode: Int = args.deployMode match { - case client | null = CLIENT - case cluster = CLUSTER - case _ = printErrorAndExit(Deploy mode must be either client or cluster); -1 -} - -// Because yarn-cluster and yarn-client encapsulate both the master -// and deploy mode, we have some logic to infer the master and deploy mode -// from each other if only one is specified, or exit early if they are at odds. -if (clusterManager == YARN) { - if (args.master == yarn-standalone) { -printWarning(\yarn-standalone\ is deprecated. Use \yarn-cluster\ instead.) -args.master = yarn-cluster - } - (args.master, args.deployMode) match { -case (yarn-cluster, null) = - deployMode = CLUSTER -case (yarn-cluster, client) = - printErrorAndExit(Client deploy mode is not compatible with master \yarn-cluster\) -case (yarn-client, cluster) = - printErrorAndExit(Cluster deploy mode is not compatible with master \yarn-client\) -case (_, mode) = - args.master = yarn- + Option(mode).getOrElse(client) - } - +if (args.clusterManagerFlag == CM_YARN) { // Make sure YARN is included in our build if we're trying to use it if (!Utils.classIsLoadable(org.apache.spark.deploy.yarn.Client) !Utils.isTesting) { printErrorAndExit( Could not load YARN classes. + This copy of Spark may not have been compiled with YARN support.) } -} - -// The following modes are not supported or applicable -(clusterManager, deployMode) match { - case (MESOS, CLUSTER) = -printErrorAndExit(Cluster deploy mode is currently not supported for Mesos clusters.) - case (_, CLUSTER) if args.isPython = -printErrorAndExit(Cluster deploy mode is currently not supported for python applications.) - case (_, CLUSTER) if isShell(args.primaryResource) = -printErrorAndExit(Cluster deploy mode is not applicable to Spark shells.) - case _ = + val hasHadoopEnv = sys.env.contains(HADOOP_CONF_DIR) || sys.env.contains(YARN_CONF_DIR) + if (!hasHadoopEnv !Utils.isTesting) { +throw new Exception(When running with master ' + args.master + ' + + either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.) + } } // If we're running a python app, set the main class to our specific python runner if (args.isPython) { if (args.primaryResource == PYSPARK_SHELL) { -args.mainClass = py4j.GatewayServer -args.childArgs = ArrayBuffer(--die-on-broken-pipe, 0) +args.mainClass = PY4J_GATEWAYSERVER +args.childArgs = mutable.ArrayBuffer(--die-on-broken-pipe, 0) } else { // If a python file is provided, add it to the child arguments and list of files to deploy. // Usage: PythonAppRunner main python file extra python files [app arguments] -args.mainClass = org.apache.spark.deploy.PythonRunner -args.childArgs = ArrayBuffer(args.primaryResource, args.pyFiles) ++ args.childArgs -args.files = mergeFileLists(args.files, args.primaryResource) +args.mainClass = PYTHON_RUNNER +args.childArgs =
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18910093 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala --- @@ -397,9 +347,9 @@ object SparkSubmit { * Provides an indirection layer for passing arguments as system properties or flags to * the user's driver program or to downstream launcher tools. */ -private[spark] case class OptionAssigner( -value: String, -clusterManager: Int, -deployMode: Int, -clOption: String = null, -sysProp: String = null) +private[spark] case class OptionAssigner(configKey: String, + clusterManager: Int, --- End diff -- Still broken here too. Style is: def foo( arg1: Blah, arg2: Blah) { --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-1209 [CORE] SparkHadoop{MapRed,MapReduce...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2814#issuecomment-59246755 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21768/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18910401 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) +
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18910527 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) +
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18910722 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) +
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911158 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -227,91 +312,92 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St */ def parse(opts: Seq[String]): Unit = opts match { case (--name) :: value :: tail = -name = value +cmdLineConfig.put(SPARK_APP_NAME, value) parse(tail) case (--master) :: value :: tail = -master = value +cmdLineConfig.put(SPARK_MASTER, value) parse(tail) case (--class) :: value :: tail = -mainClass = value +cmdLineConfig.put(SPARK_APP_CLASS, value) parse(tail) case (--deploy-mode) :: value :: tail = -if (value != client value != cluster) { - SparkSubmit.printErrorAndExit(--deploy-mode must be either \client\ or \cluster\) -} -deployMode = value +cmdLineConfig.put(SPARK_DEPLOY_MODE, value) parse(tail) case (--num-executors) :: value :: tail = -numExecutors = value +cmdLineConfig.put(SPARK_EXECUTOR_INSTANCES, value) parse(tail) case (--total-executor-cores) :: value :: tail = -totalExecutorCores = value +cmdLineConfig.put(SPARK_CORES_MAX, value) parse(tail) case (--executor-cores) :: value :: tail = -executorCores = value +cmdLineConfig.put(SPARK_EXECUTOR_CORES, value) parse(tail) case (--executor-memory) :: value :: tail = -executorMemory = value +cmdLineConfig.put(SPARK_EXECUTOR_MEMORY, value) parse(tail) case (--driver-memory) :: value :: tail = -driverMemory = value +cmdLineConfig.put(SPARK_DRIVER_MEMORY, value) parse(tail) case (--driver-cores) :: value :: tail = -driverCores = value +cmdLineConfig.put(SPARK_DRIVER_CORES, value) parse(tail) case (--driver-class-path) :: value :: tail = -driverExtraClassPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) parse(tail) case (--driver-java-options) :: value :: tail = -driverExtraJavaOptions = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) parse(tail) case (--driver-library-path) :: value :: tail = -driverExtraLibraryPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) parse(tail) case (--properties-file) :: value :: tail = -propertiesFile = value +/* We merge the property file config options into the rest of the command lines options --- End diff -- nit: see previous comment about multi-line comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911121 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -17,201 +17,286 @@ package org.apache.spark.deploy -import java.io.{File, FileInputStream, IOException} -import java.util.Properties +import java.io._ import java.util.jar.JarFile -import scala.collection.JavaConversions._ -import scala.collection.mutable.{ArrayBuffer, HashMap} +import scala.collection._ import org.apache.spark.SparkException +import org.apache.spark.deploy.ConfigConstants._ import org.apache.spark.util.Utils +import org.apache.spark.deploy.SparkSubmitArguments._ /** - * Parses and encapsulates arguments from the spark-submit script. - * The env argument is used for testing. - */ -private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, String] = sys.env) { - var master: String = null - var deployMode: String = null - var executorMemory: String = null - var executorCores: String = null - var totalExecutorCores: String = null - var propertiesFile: String = null - var driverMemory: String = null - var driverExtraClassPath: String = null - var driverExtraLibraryPath: String = null - var driverExtraJavaOptions: String = null - var driverCores: String = null - var supervise: Boolean = false - var queue: String = null - var numExecutors: String = null - var files: String = null - var archives: String = null - var mainClass: String = null - var primaryResource: String = null - var name: String = null - var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() - var jars: String = null - var verbose: Boolean = false - var isPython: Boolean = false - var pyFiles: String = null - val sparkProperties: HashMap[String, String] = new HashMap[String, String]() - - /** Default properties present in the currently defined defaults file. */ - lazy val defaultSparkProperties: HashMap[String, String] = { -val defaultProperties = new HashMap[String, String]() -if (verbose) SparkSubmit.printStream.println(sUsing properties file: $propertiesFile) -Option(propertiesFile).foreach { filename = - val file = new File(filename) - SparkSubmitArguments.getPropertiesFromFile(file).foreach { case (k, v) = -if (k.startsWith(spark)) { - defaultProperties(k) = v - if (verbose) SparkSubmit.printStream.println(sAdding default property: $k=$v) -} else { - SparkSubmit.printWarning(sIgnoring non-spark config property: $k=$v) -} - } -} -defaultProperties + * Pulls and validates configuration information together in order of priority + * + * Entries in the conf Map will be filled in the following priority order + * 1. entries specified on the command line (except from --conf entries) + * 2. Entries specified on the command line with --conf + * 3. Legacy environment variables + * 4 SPARK_DEFAULT_CONF/spark-defaults.conf or SPARK_HOME/conf/spark-defaults.conf if either exist + * 5. hard coded defaults + * +*/ +private[spark] class SparkSubmitArguments(args: Seq[String]) { + /** + * Stores all configuration items except for child arguments, + * referenced by the constants defined in ConfigConstants.scala. + */ + val conf = new mutable.HashMap[String, String]() + + def master = conf(SPARK_MASTER) + def master_= (value: String):Unit = conf.put(SPARK_MASTER, value) + + def executorMemory = conf(SPARK_EXECUTOR_MEMORY) + def executorMemory_= (value: String):Unit = conf.put(SPARK_EXECUTOR_MEMORY, value) + + def executorCores = conf(SPARK_EXECUTOR_CORES) + def executorCores_= (value: String):Unit = conf.put(SPARK_EXECUTOR_CORES, value) + + def totalExecutorCores = conf.get(SPARK_CORES_MAX) + def totalExecutorCores_= (value: String):Unit = conf.put(SPARK_CORES_MAX, value) + + def driverMemory = conf(SPARK_DRIVER_MEMORY) + def driverMemory_= (value: String):Unit = conf.put(SPARK_DRIVER_MEMORY, value) + + def driverExtraClassPath = conf.get(SPARK_DRIVER_EXTRA_CLASSPATH) + def driverExtraClassPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) + + def driverExtraLibraryPath = conf.get(SPARK_DRIVER_EXTRA_LIBRARY_PATH) + def driverExtraLibraryPath_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) + + def driverExtraJavaOptions = conf.get(SPARK_DRIVER_EXTRA_JAVA_OPTIONS) + def driverExtraJavaOptions_= (value: String):Unit = conf.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) +
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911187 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -227,91 +312,92 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St */ def parse(opts: Seq[String]): Unit = opts match { case (--name) :: value :: tail = -name = value +cmdLineConfig.put(SPARK_APP_NAME, value) parse(tail) case (--master) :: value :: tail = -master = value +cmdLineConfig.put(SPARK_MASTER, value) parse(tail) case (--class) :: value :: tail = -mainClass = value +cmdLineConfig.put(SPARK_APP_CLASS, value) parse(tail) case (--deploy-mode) :: value :: tail = -if (value != client value != cluster) { - SparkSubmit.printErrorAndExit(--deploy-mode must be either \client\ or \cluster\) -} -deployMode = value +cmdLineConfig.put(SPARK_DEPLOY_MODE, value) parse(tail) case (--num-executors) :: value :: tail = -numExecutors = value +cmdLineConfig.put(SPARK_EXECUTOR_INSTANCES, value) parse(tail) case (--total-executor-cores) :: value :: tail = -totalExecutorCores = value +cmdLineConfig.put(SPARK_CORES_MAX, value) parse(tail) case (--executor-cores) :: value :: tail = -executorCores = value +cmdLineConfig.put(SPARK_EXECUTOR_CORES, value) parse(tail) case (--executor-memory) :: value :: tail = -executorMemory = value +cmdLineConfig.put(SPARK_EXECUTOR_MEMORY, value) parse(tail) case (--driver-memory) :: value :: tail = -driverMemory = value +cmdLineConfig.put(SPARK_DRIVER_MEMORY, value) parse(tail) case (--driver-cores) :: value :: tail = -driverCores = value +cmdLineConfig.put(SPARK_DRIVER_CORES, value) parse(tail) case (--driver-class-path) :: value :: tail = -driverExtraClassPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) parse(tail) case (--driver-java-options) :: value :: tail = -driverExtraJavaOptions = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) parse(tail) case (--driver-library-path) :: value :: tail = -driverExtraLibraryPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) parse(tail) case (--properties-file) :: value :: tail = -propertiesFile = value +/* We merge the property file config options into the rest of the command lines options + * after we have finished the rest of the command line processing as property files + * cannot override explicit command line options . --- End diff -- nit: extra space before `.`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911284 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -227,91 +312,92 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St */ def parse(opts: Seq[String]): Unit = opts match { case (--name) :: value :: tail = -name = value +cmdLineConfig.put(SPARK_APP_NAME, value) parse(tail) case (--master) :: value :: tail = -master = value +cmdLineConfig.put(SPARK_MASTER, value) parse(tail) case (--class) :: value :: tail = -mainClass = value +cmdLineConfig.put(SPARK_APP_CLASS, value) parse(tail) case (--deploy-mode) :: value :: tail = -if (value != client value != cluster) { - SparkSubmit.printErrorAndExit(--deploy-mode must be either \client\ or \cluster\) -} -deployMode = value +cmdLineConfig.put(SPARK_DEPLOY_MODE, value) parse(tail) case (--num-executors) :: value :: tail = -numExecutors = value +cmdLineConfig.put(SPARK_EXECUTOR_INSTANCES, value) parse(tail) case (--total-executor-cores) :: value :: tail = -totalExecutorCores = value +cmdLineConfig.put(SPARK_CORES_MAX, value) parse(tail) case (--executor-cores) :: value :: tail = -executorCores = value +cmdLineConfig.put(SPARK_EXECUTOR_CORES, value) parse(tail) case (--executor-memory) :: value :: tail = -executorMemory = value +cmdLineConfig.put(SPARK_EXECUTOR_MEMORY, value) parse(tail) case (--driver-memory) :: value :: tail = -driverMemory = value +cmdLineConfig.put(SPARK_DRIVER_MEMORY, value) parse(tail) case (--driver-cores) :: value :: tail = -driverCores = value +cmdLineConfig.put(SPARK_DRIVER_CORES, value) parse(tail) case (--driver-class-path) :: value :: tail = -driverExtraClassPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_CLASSPATH, value) parse(tail) case (--driver-java-options) :: value :: tail = -driverExtraJavaOptions = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_JAVA_OPTIONS, value) parse(tail) case (--driver-library-path) :: value :: tail = -driverExtraLibraryPath = value +cmdLineConfig.put(SPARK_DRIVER_EXTRA_LIBRARY_PATH, value) parse(tail) case (--properties-file) :: value :: tail = -propertiesFile = value +/* We merge the property file config options into the rest of the command lines options + * after we have finished the rest of the command line processing as property files + * cannot override explicit command line options . + */ +cmdLinePropertyFileValues ++= Utils.getPropertyValuesFromFile(value) --- End diff -- So, this is actually introducing different behavior from before. A command line like this: --properties-file foo --properties-file bar Would only load bar before, but now it's loading both. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911428 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -398,22 +478,117 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Default property values - string literals are defined in ConfigConstants.scala + */ + val DEFAULTS = Map( +SPARK_MASTER - local[*], +SPARK_VERBOSE - false, +SPARK_DEPLOY_MODE - client, +SPARK_EXECUTOR_MEMORY - 1g, +SPARK_EXECUTOR_CORES - 1 , +SPARK_EXECUTOR_INSTANCES - 2, +SPARK_DRIVER_MEMORY - 512m, +SPARK_DRIVER_CORES - 1, +SPARK_DRIVER_SUPERVISE - false, +SPARK_YARN_QUEUE - default, +SPARK_EXECUTOR_INSTANCES - 2 + ) + + /** + * Config items that should only be set from the command line + */ + val CMD_LINE_ONLY_KEYS = Set ( +SPARK_VERBOSE, +SPARK_APP_CLASS, +SPARK_APP_PRIMARY_RESOURCE + ) + + /** + * Used to support legacy environment variable mappings + */ + val LEGACY_ENV_VARS = Map ( +MASTER - SPARK_MASTER, +DEPLOY_MODE - SPARK_DEPLOY_MODE, +SPARK_DRIVER_MEMORY - SPARK_DRIVER_MEMORY, +SPARK_EXECUTOR_MEMORY - SPARK_EXECUTOR_MEMORY + ) + + /** + * Function returns the spark submit default config map (Map[configName-ConfigValue]) + * Function is over-writable to allow for easier debugging + */ + private[spark] var getHardCodedDefaultValues: () = Map[String, String] = () = { --- End diff -- This feels like a long way around to achieve something simple: why not just an argument to `mergeSparkProperties()` with the default values? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911516 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -398,22 +478,117 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Default property values - string literals are defined in ConfigConstants.scala + */ + val DEFAULTS = Map( +SPARK_MASTER - local[*], +SPARK_VERBOSE - false, +SPARK_DEPLOY_MODE - client, +SPARK_EXECUTOR_MEMORY - 1g, +SPARK_EXECUTOR_CORES - 1 , +SPARK_EXECUTOR_INSTANCES - 2, +SPARK_DRIVER_MEMORY - 512m, +SPARK_DRIVER_CORES - 1, +SPARK_DRIVER_SUPERVISE - false, +SPARK_YARN_QUEUE - default, +SPARK_EXECUTOR_INSTANCES - 2 + ) + + /** + * Config items that should only be set from the command line + */ + val CMD_LINE_ONLY_KEYS = Set ( +SPARK_VERBOSE, +SPARK_APP_CLASS, +SPARK_APP_PRIMARY_RESOURCE + ) + + /** + * Used to support legacy environment variable mappings + */ + val LEGACY_ENV_VARS = Map ( +MASTER - SPARK_MASTER, +DEPLOY_MODE - SPARK_DEPLOY_MODE, +SPARK_DRIVER_MEMORY - SPARK_DRIVER_MEMORY, +SPARK_EXECUTOR_MEMORY - SPARK_EXECUTOR_MEMORY + ) + + /** + * Function returns the spark submit default config map (Map[configName-ConfigValue]) + * Function is over-writable to allow for easier debugging + */ + private[spark] var getHardCodedDefaultValues: () = Map[String, String] = () = { +DEFAULTS + } + + /** + * System environment variables. + * Function is over-writable to allow for easier debugging + */ + private[spark] var genEnvVars: () = Map[String, String] = () = --- End diff -- Similar to previous. You can use arguments with default values if you want to encapsulate the actual default implementation within this class. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911832 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala --- @@ -398,22 +478,117 @@ private[spark] class SparkSubmitArguments(args: Seq[String], env: Map[String, St } } -object SparkSubmitArguments { - /** Load properties present in the given file. */ - def getPropertiesFromFile(file: File): Seq[(String, String)] = { -require(file.exists(), sProperties file $file does not exist) -require(file.isFile(), sProperties file $file is not a normal file) -val inputStream = new FileInputStream(file) +private[spark] object SparkSubmitArguments { + /** + * Default property values - string literals are defined in ConfigConstants.scala + */ + val DEFAULTS = Map( +SPARK_MASTER - local[*], +SPARK_VERBOSE - false, +SPARK_DEPLOY_MODE - client, +SPARK_EXECUTOR_MEMORY - 1g, +SPARK_EXECUTOR_CORES - 1 , +SPARK_EXECUTOR_INSTANCES - 2, +SPARK_DRIVER_MEMORY - 512m, +SPARK_DRIVER_CORES - 1, +SPARK_DRIVER_SUPERVISE - false, +SPARK_YARN_QUEUE - default, +SPARK_EXECUTOR_INSTANCES - 2 + ) + + /** + * Config items that should only be set from the command line + */ + val CMD_LINE_ONLY_KEYS = Set ( +SPARK_VERBOSE, +SPARK_APP_CLASS, +SPARK_APP_PRIMARY_RESOURCE + ) + + /** + * Used to support legacy environment variable mappings + */ + val LEGACY_ENV_VARS = Map ( +MASTER - SPARK_MASTER, +DEPLOY_MODE - SPARK_DEPLOY_MODE, +SPARK_DRIVER_MEMORY - SPARK_DRIVER_MEMORY, +SPARK_EXECUTOR_MEMORY - SPARK_EXECUTOR_MEMORY + ) + + /** + * Function returns the spark submit default config map (Map[configName-ConfigValue]) + * Function is over-writable to allow for easier debugging + */ + private[spark] var getHardCodedDefaultValues: () = Map[String, String] = () = { +DEFAULTS + } + + /** + * System environment variables. + * Function is over-writable to allow for easier debugging + */ + private[spark] var genEnvVars: () = Map[String, String] = () = +sys.env.filterKeys( x = x.toLowerCase.startsWith(spark) ) + + /** + * Gets configuration from reading SPARK_CONF_DIR/spark-defaults.conf if it exists + * otherwise reads SPARK_HOME/conf/spark-defaults.conf if it exists + * otherwise returns an empty config structure + * Function is over-writable to allow for easier debugging --- End diff -- So, third time you're adding these... yet I don't see you using the overridability feature anywhere. I'd understand if you were using this in the tests, but what's your goal here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911938 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala --- @@ -50,71 +51,69 @@ private[spark] object SparkSubmitDriverBootstrapper { val javaOpts = sys.env(JAVA_OPTS) val defaultDriverMemory = sys.env(OUR_JAVA_MEM) -// Spark submit specific environment variables -val deployMode = sys.env(SPARK_SUBMIT_DEPLOY_MODE) -val propertiesFile = sys.env(SPARK_SUBMIT_PROPERTIES_FILE) +// SPARK_SUBMIT_BOOTSTRAP_DRIVER is used for runtime validation val bootstrapDriver = sys.env(SPARK_SUBMIT_BOOTSTRAP_DRIVER) -val submitDriverMemory = sys.env.get(SPARK_SUBMIT_DRIVER_MEMORY) -val submitLibraryPath = sys.env.get(SPARK_SUBMIT_LIBRARY_PATH) -val submitClasspath = sys.env.get(SPARK_SUBMIT_CLASSPATH) -val submitJavaOpts = sys.env.get(SPARK_SUBMIT_OPTS) + +// list of environment variables that override differently named properties +val envOverides = Map( OUR_JAVA_MEM - SPARK_DRIVER_MEMORY, + SPARK_SUBMIT_DEPLOY_MODE - SPARK_DEPLOY_MODE, + SPARK_SUBMIT_DRIVER_MEMORY - SPARK_DRIVER_MEMORY, + SPARK_SUBMIT_LIBRARY_PATH - SPARK_DRIVER_EXTRA_LIBRARY_PATH, + SPARK_SUBMIT_CLASSPATH - SPARK_DRIVER_EXTRA_CLASSPATH, + SPARK_SUBMIT_OPTS - SPARK_DRIVER_EXTRA_JAVA_OPTIONS +) + +/* SPARK_SUBMIT environment variables are treated as the highest priority source --- End diff -- nit: use single-line comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911947 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala --- @@ -50,71 +51,69 @@ private[spark] object SparkSubmitDriverBootstrapper { val javaOpts = sys.env(JAVA_OPTS) val defaultDriverMemory = sys.env(OUR_JAVA_MEM) -// Spark submit specific environment variables -val deployMode = sys.env(SPARK_SUBMIT_DEPLOY_MODE) -val propertiesFile = sys.env(SPARK_SUBMIT_PROPERTIES_FILE) +// SPARK_SUBMIT_BOOTSTRAP_DRIVER is used for runtime validation val bootstrapDriver = sys.env(SPARK_SUBMIT_BOOTSTRAP_DRIVER) -val submitDriverMemory = sys.env.get(SPARK_SUBMIT_DRIVER_MEMORY) -val submitLibraryPath = sys.env.get(SPARK_SUBMIT_LIBRARY_PATH) -val submitClasspath = sys.env.get(SPARK_SUBMIT_CLASSPATH) -val submitJavaOpts = sys.env.get(SPARK_SUBMIT_OPTS) + +// list of environment variables that override differently named properties +val envOverides = Map( OUR_JAVA_MEM - SPARK_DRIVER_MEMORY, --- End diff -- nit: no space after `(` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18911964 --- Diff: core/src/main/scala/org/apache/spark/deploy/SparkSubmitDriverBootstrapper.scala --- @@ -50,71 +51,69 @@ private[spark] object SparkSubmitDriverBootstrapper { val javaOpts = sys.env(JAVA_OPTS) val defaultDriverMemory = sys.env(OUR_JAVA_MEM) -// Spark submit specific environment variables -val deployMode = sys.env(SPARK_SUBMIT_DEPLOY_MODE) -val propertiesFile = sys.env(SPARK_SUBMIT_PROPERTIES_FILE) +// SPARK_SUBMIT_BOOTSTRAP_DRIVER is used for runtime validation val bootstrapDriver = sys.env(SPARK_SUBMIT_BOOTSTRAP_DRIVER) -val submitDriverMemory = sys.env.get(SPARK_SUBMIT_DRIVER_MEMORY) -val submitLibraryPath = sys.env.get(SPARK_SUBMIT_LIBRARY_PATH) -val submitClasspath = sys.env.get(SPARK_SUBMIT_CLASSPATH) -val submitJavaOpts = sys.env.get(SPARK_SUBMIT_OPTS) + +// list of environment variables that override differently named properties +val envOverides = Map( OUR_JAVA_MEM - SPARK_DRIVER_MEMORY, + SPARK_SUBMIT_DEPLOY_MODE - SPARK_DEPLOY_MODE, + SPARK_SUBMIT_DRIVER_MEMORY - SPARK_DRIVER_MEMORY, + SPARK_SUBMIT_LIBRARY_PATH - SPARK_DRIVER_EXTRA_LIBRARY_PATH, + SPARK_SUBMIT_CLASSPATH - SPARK_DRIVER_EXTRA_CLASSPATH, + SPARK_SUBMIT_OPTS - SPARK_DRIVER_EXTRA_JAVA_OPTIONS +) + +/* SPARK_SUBMIT environment variables are treated as the highest priority source + * of config information for their respective config variable (as listed in envOverrides) + */ +val submitEnvVars = new HashMap() ++ envOverides + .map { case(varName, propName) = (sys.env.get(varName), propName) } + .filter { case(variable, _) = variable.isDefined } + .map { case(variable, propName) = propName - variable.get } + +// Property file loading comes after all SPARK* env variables are processed and should not +// overwrite existing SPARK env variables +sys.env.get(SPARK_SUBMIT_PROPERTIES_FILE) +.flatMap ( Utils.getFileIfExists ) +.map ( Utils.loadPropFile ) +.getOrElse(Map.empty) +.foreach { case(k,v) = + submitEnvVars.getOrElseUpdate(k,v) +} + + /* See docco for SparkSubmitArguments to see the various config sources and their priority. --- End diff -- alignment is wrong, also, use single-line comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on a diff in the pull request: https://github.com/apache/spark/pull/2516#discussion_r18912066 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -1479,6 +1479,14 @@ private[spark] object Utils extends Logging { PropertyConfigurator.configure(pro) } + /** + * Flatten a map of maps out into a single map, later maps in the propList --- End diff -- Code and comment still seem to disagree. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Spark Core - [SPARK-3620] - Refactor of SparkS...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2516#issuecomment-59251504 @tigerquoll you'll need to merge this with current master, since there are conflicts. You may be able to clean up some code since the PR I mentioned before is now checked in. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59251747 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21769/consoleFull) for PR 2805 at commit [`ecb78ee`](https://github.com/apache/spark/commit/ecb78eef6bd00d1d13206fafb6e0d8b4c565ebf3). * This patch **passes all tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-3926 [CORE] Result of JavaRDD.collectAsM...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2805#issuecomment-59251760 Test PASSed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21769/ Test PASSed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3795] Heuristics for dynamically s...
Github user pwendell commented on a diff in the pull request: https://github.com/apache/spark/pull/2746#discussion_r18912692 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ExecutorAllocationManager.scala --- @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on the number of pending tasks. If the queue of pending tasks is not + * drained in N seconds, then new executors are added. If the queue persists for another M + * seconds, then more executors are added and so on. The number added in each round increases + * exponentially from the previous round until an upper bound on the number of executors has + * been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds (meaning it has not + * been scheduled to run any tasks), then it is removed. This requires starting a timer on each + * executor instead of just starting a global one as in the add case. + * + * Both add and remove attempts are retried on failure up to a maximum number of times. + * + * The relevant Spark properties include the following: + * + * spark.dynamicAllocation.enabled - Whether this feature is enabled + * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors + * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * + * spark.dynamicAllocation.addExecutorThreshold - How long before new executors are added (N) + * spark.dynamicAllocation.addExecutorInterval - How often to add new executors (M) --- End diff -- just so I understand - are the semantics of this that once we hit a state of continued backlog (e.g. we are adding executors, we will continue to add executors every M seconds)? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
GitHub user jegonzal opened a pull request: https://github.com/apache/spark/pull/2815 Remove Bytecode Inspection for Join Elimination Removing bytecode inspection from triplet operations and introducing explicit join elimination flags. The explicit flags make the join elimination more robust. You can merge this pull request into a Git repository by running: $ git pull https://github.com/jegonzal/spark SPARK-3936 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/2815.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2815 commit 2e471584d70aa8029a7eab9643cfdcb3e758a9d7 Author: Joseph E. Gonzalez joseph.e.gonza...@gmail.com Date: 2014-10-15T19:41:24Z Removing bytecode inspection from triplet operations and introducing explicit join elimination flags. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user jegonzal commented on the pull request: https://github.com/apache/spark/pull/2815#issuecomment-59263992 @ankurdave and @rxin I have not updated the applications to use the new explicit flags. I will do that in this PR pending approval for the API changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3606] [yarn] Correctly configure AmIpFi...
Github user vanzin commented on the pull request: https://github.com/apache/spark/pull/2497#issuecomment-59265338 Friendly ping. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user Ishiihara commented on a diff in the pull request: https://github.com/apache/spark/pull/2815#discussion_r18917918 --- Diff: graphx/src/main/scala/org/apache/spark/graphx/Graph.scala --- @@ -195,6 +195,12 @@ abstract class Graph[VD: ClassTag, ED: ClassTag] protected () extends Serializab * the underlying index structures can be reused. * * @param map the function from an edge object to a new edge value. + * @param mapUsesSrcAttr indicates whether the source vertex attribute should be included in + * the triplet. Setting this to false can improve performance if the source vertex attribute + * is not needed. + * @param mapUsesSrcAttr indicates whether the destination vertex attribute should be included in --- End diff -- mapUsesSrcAttr - mapUsesDstAttr --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2815#issuecomment-59265527 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21771/consoleFull) for PR 2815 at commit [`d32ec1c`](https://github.com/apache/spark/commit/d32ec1c3a2022b86e9c5e5b9f818084bb43b3088). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2815#issuecomment-59265670 [QA tests have finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21771/consoleFull) for PR 2815 at commit [`d32ec1c`](https://github.com/apache/spark/commit/d32ec1c3a2022b86e9c5e5b9f818084bb43b3088). * This patch **fails Scala style tests**. * This patch merges cleanly. * This patch adds no public classes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2815#issuecomment-59265673 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21771/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3569][SQL] Add metadata field to Struct...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2701#issuecomment-59266259 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21772/consoleFull) for PR 2701 at commit [`4266f4d`](https://github.com/apache/spark/commit/4266f4dd4df4b006d3a54144558cb92bf46003a7). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: Remove Bytecode Inspection for Join Eliminatio...
Github user AmplabJenkins commented on the pull request: https://github.com/apache/spark/pull/2815#issuecomment-59266720 Test FAILed. Refer to this link for build results (access rights to CI server needed): https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/21770/ Test FAILed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
Github user adrian-wang commented on the pull request: https://github.com/apache/spark/pull/2812#issuecomment-59270336 retest this please. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-3939][SQL] NPE caused by SessionState.o...
Github user SparkQA commented on the pull request: https://github.com/apache/spark/pull/2812#issuecomment-59271430 [QA tests have started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/21773/consoleFull) for PR 2812 at commit [`e89a7c2`](https://github.com/apache/spark/commit/e89a7c26fd095a1aea380860e0a45f80d5fcbd6b). * This patch merges cleanly. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3795] Heuristics for dynamically s...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/2746#discussion_r18920574 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ExecutorAllocationManager.scala --- @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on the number of pending tasks. If the queue of pending tasks is not + * drained in N seconds, then new executors are added. If the queue persists for another M + * seconds, then more executors are added and so on. The number added in each round increases + * exponentially from the previous round until an upper bound on the number of executors has + * been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds (meaning it has not + * been scheduled to run any tasks), then it is removed. This requires starting a timer on each + * executor instead of just starting a global one as in the add case. + * + * Both add and remove attempts are retried on failure up to a maximum number of times. + * + * The relevant Spark properties include the following: + * --- End diff -- The number of configuration options here is a little scary. Do we need all of these things to be configurable? The first 3 seem like things we definitely need. Some of the remaining things seem very tied to how long it takes a new executor to start up, and I wonder if we could hardcode them for now, and make them configurable later only if people find the defaults to be problematic. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [WIP][SPARK-3795] Heuristics for dynamically s...
Github user kayousterhout commented on a diff in the pull request: https://github.com/apache/spark/pull/2746#discussion_r18920718 --- Diff: core/src/main/scala/org/apache/spark/scheduler/ExecutorAllocationManager.scala --- @@ -0,0 +1,496 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import scala.collection.mutable + +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend + +/** + * An agent that dynamically allocates and removes executors based on the workload. + * + * The add policy depends on the number of pending tasks. If the queue of pending tasks is not + * drained in N seconds, then new executors are added. If the queue persists for another M + * seconds, then more executors are added and so on. The number added in each round increases + * exponentially from the previous round until an upper bound on the number of executors has + * been reached. + * + * The rationale for the exponential increase is twofold: (1) Executors should be added slowly + * in the beginning in case the number of extra executors needed turns out to be small. Otherwise, + * we may add more executors than we need just to remove them later. (2) Executors should be added + * quickly over time in case the maximum number of executors is very high. Otherwise, it will take + * a long time to ramp up under heavy workloads. + * + * The remove policy is simpler: If an executor has been idle for K seconds (meaning it has not + * been scheduled to run any tasks), then it is removed. This requires starting a timer on each + * executor instead of just starting a global one as in the add case. + * + * Both add and remove attempts are retried on failure up to a maximum number of times. + * + * The relevant Spark properties include the following: + * + * spark.dynamicAllocation.enabled - Whether this feature is enabled + * spark.dynamicAllocation.minExecutors - Lower bound on the number of executors + * spark.dynamicAllocation.maxExecutors - Upper bound on the number of executors + * + * spark.dynamicAllocation.addExecutorThreshold - How long before new executors are added (N) + * spark.dynamicAllocation.addExecutorInterval - How often to add new executors (M) + * spark.dynamicAllocation.removeExecutorThreshold - How long before an executor is removed (K) + * + * spark.dynamicAllocation.addExecutorRetryInterval - How often to retry adding executors + * spark.dynamicAllocation.removeExecutorRetryInterval - How often to retry removing executors + * spark.dynamicAllocation.maxAddExecutorRetryAttempts - Max retries in re-adding executors + * spark.dynamicAllocation.maxRemoveExecutorRetryAttempts - Max retries in re-removing executors + * + * Synchronization: Because the schedulers in Spark are single-threaded, contention should only + * arise when new executors register or when existing executors have been removed, both of which + * are relatively rare events with respect to task scheduling. Thus, synchronizing each method on + * the same lock should not be expensive assuming biased locking is enabled in the JVM (on by + * default for Java 6+). This may not be true, however, if the application itself runs multiple + * jobs concurrently. + * + * Note: This is part of a larger implementation (SPARK-3174) and currently does not actually + * request to add or remove executors. The mechanism to actually do this will be added separately, + * e.g. in SPARK-3822 for Yarn. + */ +private[scheduler] class ExecutorAllocationManager(scheduler: TaskSchedulerImpl) extends Logging { + private val conf = scheduler.conf + + // Lower and upper bounds on the number of executors. These are required. + private val minNumExecutors = conf.getInt(spark.dynamicAllocation.minExecutors, -1) + private val maxNumExecutors =