[GitHub] spark pull request: [SPARK-8137][core] Improve treeAggregate to co...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7461#issuecomment-122347375 Small thing, but can the title of this PR be changed to ...combine all data on *each* executor... to clarify that all the data isn't going to a single executor? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] [WIP] Implementation of a...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7075#issuecomment-121080323 Hey @josepablocam can you rebase this on current master? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8958] Dynamic allocation: change cached...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7329#issuecomment-120138836 LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] [WIP] 1-sample Anderson-D...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7278#issuecomment-120179329 It's also worth mentioning that it's implemented in the common stats packages for scipy: http://docs.scipy.org/doc/scipy-0.15.1/reference/generated/scipy.stats.anderson.html and Matlab: http://www.mathworks.com/help/stats/adtest.html and SAS: http://support.sas.com/documentation/cdl/en/procstat/63104/HTML/default/viewer.htm#procstat_univariate_sect037.htm --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r34200738 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(1 sample Kolmogorov-Smirnov test) { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) +val unifDist = new UniformRealDistribution() + +// set seeds +val seed = 10L +stdNormalDist.reseedRandomGenerator(seed) +expDist.reseedRandomGenerator(seed) +unifDist.reseedRandomGenerator(seed) + +// Sample data from the distributions and parallelize it +val n = 10 +val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10) +val sampledExp = sc.parallelize(expDist.sample(n), 10) +val sampledUnif = sc.parallelize(unifDist.sample(n), 10) + +// Use a apache math commons local KS test to verify calculations +val ksTest = new KolmogorovSmirnovTest() +val pThreshold = 0.05 + +// Comparing a standard normal sample to a standard normal distribution +val result1 = Statistics.ksTest(sampledNorm, stdnorm) +val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect()) +val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) +// Verify vs apache math commons ks test +assert(result1.statistic === referenceStat1) +assert(result1.pValue === referencePVal1) +// Cannot reject null hypothesis +assert(result1.pValue pThreshold) + +// Comparing an exponential sample to a standard normal distribution +val result2 = Statistics.ksTest(sampledExp, stdnorm) +val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect()) +val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) +// verify vs apache math commons ks test +assert(result2.statistic === referenceStat2) +assert(result2.pValue === referencePVal2) +// reject null hypothesis +assert(result2.pValue pThreshold) + +// Testing the use of a user provided CDF function +// Distribution is not serializable, so will have to create in the lambda +val expCDF = (x: Double) = new ExponentialDistribution(0.2).cumulativeProbability(x) + +// Comparing an exponential sample with mean X to an exponential distribution with mean Y +// Where X != Y +val result3 = Statistics.ksTest(sampledExp, expCDF) +val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), + sampledExp.collect()) +val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt) +// verify vs apache math commons ks test +assert(result3.statistic === referenceStat3) +assert(result3.pValue === referencePVal3) +// reject null hypothesis +assert(result3.pValue pThreshold) + +/* + Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample + sessionInfo() + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + set.seed(20) + v - rnorm(20) + v + [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612 + [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 -0.15038222 + [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264 + [19] -0.08578219 0.38921440 + ks.test(v, pnorm, alternative = two.sided) + + One-sample Kolmogorov-Smirnov test + + data: v + D = 0.18874, p-value = 0.4223 + alternative hypothesis: two-sided +*/ + +val RKSStat = 0.18874 +val RKSPVal = 0.4223 +val RData = sc.parallelize( +Array( + 1.1626852897838, -0.585924465893051, 1.78546500331661, -1.33259371048501, + -0.446566766553219, 0.569606122374976, -2.88971761441412, -0.869018343326555, + -0.461702683149641, -0.40910137444, -0.0201353678515895, -0.150382224136063, + -0.628126755843964, 1.32322085193283, -1.52135057001199, -0.437427868856691, + 0.970577579543399, 0.0282226444247749, -0.0857821886527593, 0.389214404984942 +) +) +val RCompResult = Statistics.ksTest(RData, stdnorm) +assert(RCompResult.statistic ~== RKSStat relTol 1e-4) +assert(RCompResult.pValue ~== RKSPVal relTol 1e-4) + --- End diff
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r34201295 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.rdd.RDD + +/** + * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no ties occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the Kolmogorov-Smirnov test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. + * The AD statistic is defined as -n - s/n, where + * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i}) + * where z_i is the CDF value of the ith observation in the sorted sample. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object ADTest { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSample = Value(Sample follows theoretical distribution.) + } + + /** + * ADTheoreticalDist is a trait that every distribution used in an AD test must extend. + * The rationale for this is that the AD test has distribution-dependent critical values, and by + * requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait ADTheoreticalDist { +val params: Array[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Sourced from + * http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf + * https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017 + */ + + // Exponential distribution + class ADExponential(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new ExponentialDistribution(params(0)) + +private val rawCVs = ListMap( + 0.15 - 0.922, 0.10 - 1.078, + 0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)} +} + } + + // Normal Distribution + class ADNormal(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new NormalDistribution(params(0), params(1)) + +private val rawCVs = ListMap( + 0.15 - 0.576, 0.10 - 0.656, + 0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n * n)) } +} + } + + // Gumbel distribution + class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new GumbelDistribution(params(0), params(1
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r34200807 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(1 sample Kolmogorov-Smirnov test) { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) +val unifDist = new UniformRealDistribution() + +// set seeds +val seed = 10L +stdNormalDist.reseedRandomGenerator(seed) +expDist.reseedRandomGenerator(seed) +unifDist.reseedRandomGenerator(seed) + +// Sample data from the distributions and parallelize it +val n = 10 +val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10) +val sampledExp = sc.parallelize(expDist.sample(n), 10) +val sampledUnif = sc.parallelize(unifDist.sample(n), 10) + +// Use a apache math commons local KS test to verify calculations +val ksTest = new KolmogorovSmirnovTest() +val pThreshold = 0.05 + +// Comparing a standard normal sample to a standard normal distribution +val result1 = Statistics.ksTest(sampledNorm, stdnorm) +val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect()) +val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) +// Verify vs apache math commons ks test +assert(result1.statistic === referenceStat1) +assert(result1.pValue === referencePVal1) +// Cannot reject null hypothesis +assert(result1.pValue pThreshold) + +// Comparing an exponential sample to a standard normal distribution +val result2 = Statistics.ksTest(sampledExp, stdnorm) +val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect()) +val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) +// verify vs apache math commons ks test +assert(result2.statistic === referenceStat2) +assert(result2.pValue === referencePVal2) +// reject null hypothesis +assert(result2.pValue pThreshold) + +// Testing the use of a user provided CDF function +// Distribution is not serializable, so will have to create in the lambda +val expCDF = (x: Double) = new ExponentialDistribution(0.2).cumulativeProbability(x) + +// Comparing an exponential sample with mean X to an exponential distribution with mean Y +// Where X != Y +val result3 = Statistics.ksTest(sampledExp, expCDF) +val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), + sampledExp.collect()) +val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt) +// verify vs apache math commons ks test +assert(result3.statistic === referenceStat3) +assert(result3.pValue === referencePVal3) +// reject null hypothesis +assert(result3.pValue pThreshold) + +/* + Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample --- End diff -- Indent this block an extra space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r34200842 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(1 sample Kolmogorov-Smirnov test) { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) +val unifDist = new UniformRealDistribution() + +// set seeds +val seed = 10L +stdNormalDist.reseedRandomGenerator(seed) +expDist.reseedRandomGenerator(seed) +unifDist.reseedRandomGenerator(seed) + +// Sample data from the distributions and parallelize it +val n = 10 +val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10) +val sampledExp = sc.parallelize(expDist.sample(n), 10) +val sampledUnif = sc.parallelize(unifDist.sample(n), 10) + +// Use a apache math commons local KS test to verify calculations +val ksTest = new KolmogorovSmirnovTest() +val pThreshold = 0.05 + +// Comparing a standard normal sample to a standard normal distribution +val result1 = Statistics.ksTest(sampledNorm, stdnorm) +val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect()) +val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) +// Verify vs apache math commons ks test +assert(result1.statistic === referenceStat1) +assert(result1.pValue === referencePVal1) +// Cannot reject null hypothesis +assert(result1.pValue pThreshold) + +// Comparing an exponential sample to a standard normal distribution +val result2 = Statistics.ksTest(sampledExp, stdnorm) +val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect()) +val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) +// verify vs apache math commons ks test +assert(result2.statistic === referenceStat2) +assert(result2.pValue === referencePVal2) +// reject null hypothesis +assert(result2.pValue pThreshold) + +// Testing the use of a user provided CDF function +// Distribution is not serializable, so will have to create in the lambda +val expCDF = (x: Double) = new ExponentialDistribution(0.2).cumulativeProbability(x) + +// Comparing an exponential sample with mean X to an exponential distribution with mean Y +// Where X != Y +val result3 = Statistics.ksTest(sampledExp, expCDF) +val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), + sampledExp.collect()) +val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt) +// verify vs apache math commons ks test +assert(result3.statistic === referenceStat3) +assert(result3.pValue === referencePVal3) +// reject null hypothesis +assert(result3.pValue pThreshold) + +/* + Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample + sessionInfo() + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + set.seed(20) + v - rnorm(20) + v + [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612 + [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 -0.15038222 + [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264 + [19] -0.08578219 0.38921440 + ks.test(v, pnorm, alternative = two.sided) + + One-sample Kolmogorov-Smirnov test + + data: v + D = 0.18874, p-value = 0.4223 + alternative hypothesis: two-sided +*/ + +val RKSStat = 0.18874 --- End diff -- lower case variable names --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r34200828 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(1 sample Kolmogorov-Smirnov test) { +// Create theoretical distributions +val stdNormalDist = new NormalDistribution(0, 1) +val expDist = new ExponentialDistribution(0.6) +val unifDist = new UniformRealDistribution() + +// set seeds +val seed = 10L +stdNormalDist.reseedRandomGenerator(seed) +expDist.reseedRandomGenerator(seed) +unifDist.reseedRandomGenerator(seed) + +// Sample data from the distributions and parallelize it +val n = 10 +val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10) +val sampledExp = sc.parallelize(expDist.sample(n), 10) +val sampledUnif = sc.parallelize(unifDist.sample(n), 10) + +// Use a apache math commons local KS test to verify calculations +val ksTest = new KolmogorovSmirnovTest() +val pThreshold = 0.05 + +// Comparing a standard normal sample to a standard normal distribution +val result1 = Statistics.ksTest(sampledNorm, stdnorm) +val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledNorm.collect()) +val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n) +// Verify vs apache math commons ks test +assert(result1.statistic === referenceStat1) +assert(result1.pValue === referencePVal1) +// Cannot reject null hypothesis +assert(result1.pValue pThreshold) + +// Comparing an exponential sample to a standard normal distribution +val result2 = Statistics.ksTest(sampledExp, stdnorm) +val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, sampledExp.collect()) +val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n) +// verify vs apache math commons ks test +assert(result2.statistic === referenceStat2) +assert(result2.pValue === referencePVal2) +// reject null hypothesis +assert(result2.pValue pThreshold) + +// Testing the use of a user provided CDF function +// Distribution is not serializable, so will have to create in the lambda +val expCDF = (x: Double) = new ExponentialDistribution(0.2).cumulativeProbability(x) + +// Comparing an exponential sample with mean X to an exponential distribution with mean Y +// Where X != Y +val result3 = Statistics.ksTest(sampledExp, expCDF) +val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new ExponentialDistribution(0.2), + sampledExp.collect()) +val referencePVal3 = 1 - ksTest.cdf(referenceStat3, sampledNorm.count().toInt) +// verify vs apache math commons ks test +assert(result3.statistic === referenceStat3) +assert(result3.pValue === referencePVal3) +// reject null hypothesis +assert(result3.pValue pThreshold) + +/* + Comparing results with R's implementation of Kolmogorov-Smirnov for 1 sample + sessionInfo() + R version 3.2.0 (2015-04-16) + Platform: x86_64-apple-darwin13.4.0 (64-bit) + set.seed(20) + v - rnorm(20) + v + [1] 1.16268529 -0.58592447 1.78546500 -1.33259371 -0.44656677 0.56960612 + [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 -0.15038222 + [13] -0.62812676 1.32322085 -1.52135057 -0.43742787 0.97057758 0.02822264 + [19] -0.08578219 0.38921440 + ks.test(v, pnorm, alternative = two.sided) + + One-sample Kolmogorov-Smirnov test + + data: v + D = 0.18874, p-value = 0.4223 + alternative hypothesis: two-sided +*/ + +val RKSStat = 0.18874 +val RKSPVal = 0.4223 +val RData = sc.parallelize( +Array( --- End diff -- indent this back a space --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r34196348 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +153,166 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(Anderson-Darling test for 1 sample) { +/* + Data generated with R + sessionInfo() #truncated +R version 3.2.0 (2015-04-16) +Platform: x86_64-apple-darwin13.4.0 (64-bit) +Running under: OS X 10.10.2 (Yosemite) + set.seed(10) + dataNorm - rnorm(20) + dataExp - rexp(20) + dataUnif - runif(20) + mean(dataNorm) + [1] -0.06053267 + sd(dataNorm) + [1] 0.7999093 + mean(dataExp) + [1] 1.044636 + sd(dataExp) + [1] 0.96727 + mean(dataUnif) + [1] 0.4420219 + sd(dataUnif) + [1] 0.2593285 + */ + +val dataNorm = sc.parallelize( + Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, + -0.599167715783718, 0.294545126567508, 0.389794300700167, -1.20807617542949, + -0.363676017470862, -1.62667268170309, -0.256478394123992, 1.10177950308713, + 0.755781508027337, -0.238233556018718, 0.98744470341339, 0.741390128383824, + 0.0893472664958216, -0.954943856152377, -0.195150384667239, 0.92552126209408, + 0.482978524836611) +) +val dataExp = sc.parallelize( + Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 1.11045944034578, +0.170421596598791, 1.91878133072498, 0.166443939786404, 0.97028998914142, 0.010571192484349, +2.79300971312409, 2.35461177957702, 0.667238388210535, 0.522243486717343, 0.146712897811085, +0.751234306178963, 2.2885662248, 0.0688535687513649, 0.282713153399527, +0.0514786350540817, 3.02959313971882) +) +val dataUnif = sc.parallelize( + Array(0.545859839767218, 0.372763097286224, 0.961302414536476, 0.257341569056734, +0.207951683318242, 0.861382439732552, 0.464391982648522, 0.222867433447391, +0.623549601528794, 0.203647700604051, 0.0196734135970473, 0.797993005951867, +0.274318896699697, 0.166609104024246, 0.170151718193665, 0.4885059366934, +0.711409077281132, 0.591934921452776, 0.517156876856461, 0.381627685856074) +) + +/* normality test in R + library(nortest) --- End diff -- Can you indent inside this comment block a little further --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r34196274 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +153,166 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(Anderson-Darling test for 1 sample) { +/* + Data generated with R + sessionInfo() #truncated +R version 3.2.0 (2015-04-16) +Platform: x86_64-apple-darwin13.4.0 (64-bit) +Running under: OS X 10.10.2 (Yosemite) + set.seed(10) + dataNorm - rnorm(20) + dataExp - rexp(20) + dataUnif - runif(20) + mean(dataNorm) + [1] -0.06053267 + sd(dataNorm) + [1] 0.7999093 + mean(dataExp) + [1] 1.044636 + sd(dataExp) + [1] 0.96727 + mean(dataUnif) + [1] 0.4420219 + sd(dataUnif) + [1] 0.2593285 + */ + +val dataNorm = sc.parallelize( + Array(0.0187461709418264, -0.184252542069064, -1.37133054992251, + -0.599167715783718, 0.294545126567508, 0.389794300700167, -1.20807617542949, + -0.363676017470862, -1.62667268170309, -0.256478394123992, 1.10177950308713, + 0.755781508027337, -0.238233556018718, 0.98744470341339, 0.741390128383824, + 0.0893472664958216, -0.954943856152377, -0.195150384667239, 0.92552126209408, + 0.482978524836611) +) +val dataExp = sc.parallelize( + Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 1.11045944034578, +0.170421596598791, 1.91878133072498, 0.166443939786404, 0.97028998914142, 0.010571192484349, +2.79300971312409, 2.35461177957702, 0.667238388210535, 0.522243486717343, 0.146712897811085, +0.751234306178963, 2.2885662248, 0.0688535687513649, 0.282713153399527, +0.0514786350540817, 3.02959313971882) +) +val dataUnif = sc.parallelize( + Array(0.545859839767218, 0.372763097286224, 0.961302414536476, 0.257341569056734, +0.207951683318242, 0.861382439732552, 0.464391982648522, 0.222867433447391, +0.623549601528794, 0.203647700604051, 0.0196734135970473, 0.797993005951867, +0.274318896699697, 0.166609104024246, 0.170151718193665, 0.4885059366934, +0.711409077281132, 0.591934921452776, 0.517156876856461, 0.381627685856074) +) + +/* normality test in R + library(nortest) + ad.test(datNorm) + +Anderson-Darling normality test + +data: datNorm +A = 0.27523, p-value = 0.6216 + + ad.test(datExp) + +Anderson-Darling normality test + +data: datExp +A = 0.79034, p-value = 0.03336 + + ad.test(datUnif) + +Anderson-Darling normality test + +data: datUnif +A = 0.31831, p-value = 0.5114 + +*/ + +val RNormADStats = Map(norm - 0.27523, exp - 0.79034, unif - 0.31831) +val RParams = Map( + norm -(-0.06053267, 0.7999093), + exp -(1.044636, 0.96727), + unif -(0.4420219, 0.2593285) +) + + +val ADTestNormNorm = ADTest.testOneSample(dataNorm, norm) +assert(ADTestNormNorm.statistic ~== RNormADStats(norm) relTol 1e-4) +val ADTestExpNorm = Statistics.andersonDarlingTest(dataExp, norm) +assert(ADTestExpNorm.statistic ~== RNormADStats(exp) relTol 1e-4) +val ADTestUnifNorm = Statistics.andersonDarlingTest(dataUnif, norm) +assert(ADTestUnifNorm.statistic ~== RNormADStats(unif) relTol 1e-4) + +// Testing passing in parameters estimated in R vs estimating directly +assert( + Statistics.andersonDarlingTest( +dataNorm, +norm, +RParams(norm)._1, +RParams(norm)._2 + ).statistic + ~== ADTestNormNorm.statistic relTol 1e-4 +) +assert( + Statistics.andersonDarlingTest( +dataExp, +norm, +RParams(exp)._1, +RParams(exp)._2 + ).statistic + ~== ADTestExpNorm.statistic relTol 1e-4 +) +assert( + Statistics.andersonDarlingTest( +dataUnif, +norm, +RParams(unif)._1, +RParams(unif)._2 + ).statistic + ~== ADTestUnifNorm.statistic relTol 1e-4 +) + +/* + normality test in scipy: comparing critical values + from scipy.stats import anderson + # drop in values as arrays + ... + anderson(dataNorm, norm) +(0.27523090925717852, array
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r34201231 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.rdd.RDD + +/** + * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no ties occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the Kolmogorov-Smirnov test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. + * The AD statistic is defined as -n - s/n, where + * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i}) + * where z_i is the CDF value of the ith observation in the sorted sample. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object ADTest { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSample = Value(Sample follows theoretical distribution.) + } + + /** + * ADTheoreticalDist is a trait that every distribution used in an AD test must extend. + * The rationale for this is that the AD test has distribution-dependent critical values, and by + * requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait ADTheoreticalDist { +val params: Array[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Sourced from + * http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf + * https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017 + */ + + // Exponential distribution + class ADExponential(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new ExponentialDistribution(params(0)) + +private val rawCVs = ListMap( + 0.15 - 0.922, 0.10 - 1.078, + 0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)} +} + } + + // Normal Distribution + class ADNormal(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new NormalDistribution(params(0), params(1)) + +private val rawCVs = ListMap( + 0.15 - 0.576, 0.10 - 0.656, + 0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n * n)) } +} + } + + // Gumbel distribution + class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new GumbelDistribution(params(0), params(1
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r34201204 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.rdd.RDD + +/** + * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no ties occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the Kolmogorov-Smirnov test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. + * The AD statistic is defined as -n - s/n, where + * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i}) + * where z_i is the CDF value of the ith observation in the sorted sample. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object ADTest { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSample = Value(Sample follows theoretical distribution.) + } + + /** + * ADTheoreticalDist is a trait that every distribution used in an AD test must extend. + * The rationale for this is that the AD test has distribution-dependent critical values, and by + * requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait ADTheoreticalDist { +val params: Array[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Sourced from + * http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf + * https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017 + */ + + // Exponential distribution + class ADExponential(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new ExponentialDistribution(params(0)) + +private val rawCVs = ListMap( + 0.15 - 0.922, 0.10 - 1.078, + 0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)} +} + } + + // Normal Distribution + class ADNormal(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new NormalDistribution(params(0), params(1)) + +private val rawCVs = ListMap( + 0.15 - 0.576, 0.10 - 0.656, + 0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092 +) + +def cdf(x: Double): Double = theoretical.cumulativeProbability(x) + +def getCVs(n: Double): Map[Double, Double] = { + rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n * n)) } +} + } + + // Gumbel distribution + class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist { +private val theoretical = new GumbelDistribution(params(0), params(1
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214315 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get the number of locality aware pending tasks and related locality preferences as the + * hints used for executor allocation. + */ +def executorPlacementHints(): (Int, Map[String, Int]) = + allocationManager.synchronized { + var localityAwarePendingTasks: Int = 0 + val localityToCount = new mutable.HashMap[String, Int]() + stageIdToPreferredLocations.values.foreach { localities = --- End diff -- Given that the values in stageIdToPreferredLocations don't change after the stage is submitted, it seems like we're going to be redoing a bunch of work each time we sync with the YarnAllocator. Can we save intermediate information to avoid this? Also, the way this works, we don't actually differentiate between pending, running, and completed tasks. All tasks for a stage are counted as pending as long as the stage is still running. How computationally expensive would it be to keep this up to date when tasks start and complete? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34213732 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -509,6 +514,10 @@ private[spark] class ExecutorAllocationManager( // Number of tasks currently running on the cluster. Should be 0 when no stages are active. private var numRunningTasks: Int = _ +// stageId to preferred localities map, maintain the preferred node location of each task in --- End diff -- location - locations --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34213924 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get the number of locality aware pending tasks and related locality preferences as the --- End diff -- Change this to: Get a tuple of (the total number of pending tasks with locality preferences, a map where each pair is a node and the number of pending tasks that would like to be scheduled on that node). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214220 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get the number of locality aware pending tasks and related locality preferences as the + * hints used for executor allocation. + */ +def executorPlacementHints(): (Int, Map[String, Int]) = + allocationManager.synchronized { + var localityAwarePendingTasks: Int = 0 --- End diff -- no need for `: Int` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214716 --- Diff: core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala --- @@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Executors we have requested the cluster manager to kill that have not died yet private val executorsPendingToRemove = new HashSet[String] + // A map to store preferred locality with its required count + protected var preferredLocalityToCount: Map[String, Int] = Map.empty + + // The number of pending tasks which is locality required + protected var localityAwarePendingTasks: Int = 0 --- End diff -- Don't need `: Int` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34215261 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCounts a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCounts: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214513 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage = + partitionsToCompute.map { id = (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage = + val job = s.resultOfJob.get + partitionsToCompute.map { id = +val p: Int = job.partitions(id) --- End diff -- No need for `: Int` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34215043 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCounts a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCounts: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34215363 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get the number of locality aware pending tasks and related locality preferences as the + * hints used for executor allocation. + */ +def executorPlacementHints(): (Int, Map[String, Int]) = --- End diff -- Do any tests exercise this code? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214210 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get the number of locality aware pending tasks and related locality preferences as the + * hints used for executor allocation. + */ +def executorPlacementHints(): (Int, Map[String, Int]) = + allocationManager.synchronized { --- End diff -- indent everything inside the synchronized block --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214494 --- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala --- @@ -872,6 +872,25 @@ class DAGScheduler( // will be posted, which should always come after a corresponding SparkListenerStageSubmitted // event. stage.latestInfo = StageInfo.fromStage(stage, Some(partitionsToCompute.size)) +val taskIdToLocations = try { + stage match { +case s: ShuffleMapStage = + partitionsToCompute.map { id = (id, getPreferredLocs(stage.rdd, id))}.toMap +case s: ResultStage = + val job = s.resultOfJob.get + partitionsToCompute.map { id = +val p: Int = job.partitions(id) +(id, getPreferredLocs(stage.rdd, p)) + }.toMap + } +} catch { + case NonFatal(e) = --- End diff -- Why would we expect an exception to occur here? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34214893 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,205 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCounts a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCounts: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r34215406 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite --- End diff -- Is this used for anything? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7263] Add new shuffle manager which sto...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7265#issuecomment-119344309 Is my understanding correct that, with this shuffle manager, we wouldn't be able to do reduce-side that sorts records, or any map-side spilling? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7263] Add new shuffle manager which sto...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7265#issuecomment-119350825 Having the Parquet shuffle reader follow that pattern seems preferable to me over failing when spilling would be required. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33515526 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = +val partDiffs = oneSampleDifferences(part, n, cdf) // local distances --- End diff -- indent these two spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4069] [YARN] When AppMaster finishes, t...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/5233#issuecomment-116877465 There's no bug on the Spark side. @PraveenSeluka is this still something you're running into? If this is causing pain to a lot of users it could be worth the workaround, but otherwise I'd close this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6994#issuecomment-116876951 This LGTM. Can you file a separate JIRA to add Python support? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8374] [YARN] Job frequently hangs after ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7083#issuecomment-116882631 Hi @xuchenCN what makes you think this is the right fix? We already decrement `numExecutorsRunning` when a container is killed, so I think doing it again would be double-counting. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-115565919 Hi @watermen, thanks for reporting this. Does the error occur every time or just occasionally? What Hadoop version are you running? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33394080 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toLong) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param createDist `Unit = RealDistribution` function to create a theoretical distribution + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], createDist: () = RealDistribution): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33394012 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toLong) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param createDist `Unit = RealDistribution` function to create a theoretical distribution + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], createDist: () = RealDistribution): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33393943 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() --- End diff -- indent this back two spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33394039 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toLong) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param createDist `Unit = RealDistribution` function to create a theoretical distribution + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], createDist: () = RealDistribution): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33393914 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + * + * Implementation note: We seek to implement the KS test with a minimal number of distributed + * passes. We sort the RDD, and then perform the following operations on a per-partition basis: + * calculate an empirical cumulative distribution value for each observation, and a theoretical + * cumulative distribution value. We know the latter to be correct, while the former will be off by + * a constant (how large the constant is depends on how many values precede it in other partitions). + * However, given that this constant simply shifts the ECDF upwards, but doesn't change its shape, + * and furthermore, that constant is the same within a given partition, we can pick 2 values + * in each partition that can potentially resolve to the largest global distance. Namely, we + * pick the minimum distance and the maximum distance. Additionally, we keep track of how many + * elements are in each partition. Once these three values have been returned for every partition, + * we can collect and operate locally. Locally, we can now adjust each distance by the appropriate + * constant (the cumulative sum of # of elements in the prior partitions divided by the data set + * size). Finally, we take the maximum absolute value, and this is the statistic. + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val partDiffs = oneSampleDifferences(part, n, cdf) // local distances + searchOneSampleCandidates(partDiffs) // candidates: local extrema + }.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toLong) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param createDist `Unit = RealDistribution` function to create a theoretical distribution + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], createDist: () = RealDistribution): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { part = + val
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6994#issuecomment-115876109 This looks great, just had a few more minor style comments. Can you also add some documentation here: https://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing? The source for this document is in docs/mllib-statistics.md. @jkbradley @mengxr any comments before I merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8623. Hadoop RDDs fail to properly seria...
GitHub user sryza opened a pull request: https://github.com/apache/spark/pull/7050 SPARK-8623. Hadoop RDDs fail to properly serialize configuration You can merge this pull request into a Git repository by running: $ git pull https://github.com/sryza/spark sandy-spark-8623 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/7050.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #7050 commit 58a80799ee953d19d3bf05906bcb00eb57da4c2c Author: Sandy Ryza sa...@cloudera.com Date: 2015-06-26T22:19:51Z SPARK-8623. Hadoop RDDs fail to properly serialize configuration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8623. Hadoop RDDs fail to properly seria...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7050#issuecomment-115910651 jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33315886 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCount a map to store the preferred host name and its required count + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33316253 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCount a map to store the preferred host name and its required count + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33315534 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient { * This can result in canceling pending requests or filing additional requests. * @return whether the request is acknowledged by the cluster manager. */ - private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + private[spark] def requestTotalExecutors( + numExecutors: Int, --- End diff -- Document the new parameters --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317386 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite + import yarnAllocatorSuite._ + + override def beforeEach() { +yarnAllocatorSuite.beforeEach() + } + + override def afterEach() { +yarnAllocatorSuite.afterEach() + } + + test(allocate locality preferred containers with enough resource and no matched existed + +containers) { +// 1. All the locations of current containers cannot satisfy the new requirements +// 2. Current requested container number can fully satisfy the pending tasks. + +val handler = createAllocator(2) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host2))) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(3, 15, +Map(host3 - 15, host4 - 15, host5 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + Array(host3, host4, host5), + Array(host3, host4, host5), + Array(host3, host4))) + } + + test(allocate locality preferred containers with enough resource and partially matched + +containers) { +// 1. Parts of current containers' locations can satisfy the new requirements +// 2. Current requested container number can fully satisfy the pending tasks. + +val handler = createAllocator(3) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2) +)) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(3, 15, +Map(host1 - 15, host2 - 15, host3 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + null, // requested requested container with no locality preference + Array(host2, host3), + Array(host2, host3))) + } + + test(allocate locality preferred containers with limited resource and partially matched + +containers) { +// 1. Parts of current containers' locations can satisfy the new requirements +// 2. Current requested container number cannot fully satisfy the pending tasks. + +val handler = createAllocator(3) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2) +)) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(1, 15, +Map(host1 - 15, host2 - 15, host3 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + /** newly requested locality preferred containers */ + Array(host2, host3))) + } + + test(allocate locality preferred containers with fully matched containers) { +// Current containers' locations can fully satisfy the new requirements + +val handler = createAllocator(5) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2), + createContainer(host2), + createContainer(host3) +)) + +val nodeLocalities
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317505 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite + import yarnAllocatorSuite._ + + override def beforeEach() { +yarnAllocatorSuite.beforeEach() + } + + override def afterEach() { +yarnAllocatorSuite.afterEach() + } + + test(allocate locality preferred containers with enough resource and no matched existed + +containers) { +// 1. All the locations of current containers cannot satisfy the new requirements +// 2. Current requested container number can fully satisfy the pending tasks. + +val handler = createAllocator(2) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( --- End diff -- Can fit on a single line? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317634 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCount a map to store the preferred host name and its required count + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317476 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -149,7 +159,13 @@ private[yarn] class YarnAllocator( * * @return Whether the new requested total is different than the old value. */ - def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized { + def requestTotalExecutorsWithPreferredLocalities( --- End diff -- Document the new arguments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321039 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { + part = --- End diff -- put part on the line above and indent the rest back two spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321013 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -158,4 +158,25 @@ object Statistics { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } + + /** + * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality + * @param data an `RDD[Double]` containing the sample of data to test + * @param cdf a `Double = Double` function to calculate the theoretical CDF at a given value + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = { +KSTest.testOneSample(data, cdf) + } + + /** + * A convenience method to conduct a one-sample, two sided Kolmogorov Smirnov test for probability + * distribution equality + * @param data an `RDD[Double]` containing the sample of data to test + * @param name a `String` name for a theoretical distribution --- End diff -- Mention what distributions are supported --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321093 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], cdf: Double = Double): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { + part = +val partDiffs = oneSampleDifferences(part, n, cdf) // local distances +searchOneSampleCandidates(partDiffs) // candidates: local extrema +}.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toInt) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param data `RDD[Double]` data on which to run test + * @param createDist `Unit = RealDistribution` function to create a theoretical distribution + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(data: RDD[Double], createDist: () = RealDistribution): KSTestResult = { +val n = data.count().toDouble +val localData = data.sortBy(x = x).mapPartitions { + part = +val partDiffs = oneSampleDifferences(part, n, createDist) // local distances +searchOneSampleCandidates(partDiffs) // candidates: local extrema +}.collect() +val ksStat = searchOneSampleStatistic(localData, n) // result: global extreme +evalOneSampleP(ksStat, n.toInt) + } + + /** + * Calculate unadjusted distances between the empirical CDF and the theoretical CDF in a + * partition + * @param partData `Iterator[Double]` 1 partition of a sorted RDD + * @param n `Double` the total size of the RDD + * @param cdf `Double = Double` a function the calculates the theoretical CDF of a value + * @return `Iterator[Double] `Unadjusted (ie. off by a constant) differences between + *ECDF (empirical cumulative distribution function) and CDF. We subtract in such a way + *that when adjusted by the appropriate constant, the difference will be equivalent + *to the KS statistic calculation described in + *http://www.itl.nist.gov/div898/handbook/eda/section3/eda35g.htm + *where the difference is not exactly symmetric + */ + private def oneSampleDifferences(partData: Iterator[Double], n: Double, cdf: Double = Double
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321019 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + --- End diff -- extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321567 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,181 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.{NormalDistribution, RealDistribution} +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test --- End diff -- Can you include a note on how this is implemented? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33316936 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCount a map to store the preferred host name and its required count + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317230 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param preferredLocalityToCount a map to store the preferred host name and its required count + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover the requested localities. + * For example if we have 5 containers on each
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33317350 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite + import yarnAllocatorSuite._ + + override def beforeEach() { +yarnAllocatorSuite.beforeEach() + } + + override def afterEach() { +yarnAllocatorSuite.afterEach() + } + + test(allocate locality preferred containers with enough resource and no matched existed + +containers) { +// 1. All the locations of current containers cannot satisfy the new requirements +// 2. Current requested container number can fully satisfy the pending tasks. + +val handler = createAllocator(2) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host2))) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(3, 15, +Map(host3 - 15, host4 - 15, host5 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + Array(host3, host4, host5), + Array(host3, host4, host5), + Array(host3, host4))) + } + + test(allocate locality preferred containers with enough resource and partially matched + +containers) { +// 1. Parts of current containers' locations can satisfy the new requirements +// 2. Current requested container number can fully satisfy the pending tasks. + +val handler = createAllocator(3) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2) +)) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(3, 15, +Map(host1 - 15, host2 - 15, host3 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + null, // requested requested container with no locality preference + Array(host2, host3), + Array(host2, host3))) + } + + test(allocate locality preferred containers with limited resource and partially matched + +containers) { +// 1. Parts of current containers' locations can satisfy the new requirements +// 2. Current requested container number cannot fully satisfy the pending tasks. + +val handler = createAllocator(3) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2) +)) + +val nodeLocalities = + handler.containerPlacementStrategy.localityOfRequestedContainers(1, 15, +Map(host1 - 15, host2 - 15, host3 - 10)) + .map(_.nodes) + +assert(nodeLocalities === Array( + /** newly requested locality preferred containers */ + Array(host2, host3))) + } + + test(allocate locality preferred containers with fully matched containers) { +// Current containers' locations can fully satisfy the new requirements + +val handler = createAllocator(5) +handler.updateResourceRequests() +handler.handleAllocatedContainers(Array( + createContainer(host1), + createContainer(host1), + createContainer(host2), + createContainer(host2), + createContainer(host3) +)) + +val nodeLocalities
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33321131 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala --- @@ -90,3 +90,18 @@ class ChiSqTestResult private[stat] (override val pValue: Double, super.toString } } + +/** + * :: Experimental :: + * Object containing the test results for the Kolmogorov-Smirnov test. + */ +@Experimental +class KSTestResult private[stat] (override val pValue: Double, + override val statistic: Double, --- End diff -- Indent these back to be consistent with the `ChiSqTestResult` definition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33194722 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala --- @@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping import org.apache.hadoop.yarn.api.records._ import org.apache.hadoop.yarn.client.api.AMRMClient import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest +import org.scalatest.{BeforeAndAfterEach, Matchers} --- End diff -- Any reason for moving these imports? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33195618 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] trait ContainerPlacementStrategy { + type Locality = Array[String] + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers(numContainer: Int, numLocalityAwarePendingTasks: Int, --- End diff -- When arguments overflow a single line, each gets its own line. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33195574 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] trait ContainerPlacementStrategy { + type Locality = Array[String] + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers(numContainer: Int, numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int]): (Array[Locality], Array[Locality]) +} + +/** + * This strategy calculates the preferred localities by considering the node ratio of pending --- End diff -- Can we mention here what this strategy is trying to optimize? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33191664 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -158,4 +158,44 @@ object Statistics { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } + + /** + * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality + * @param data an `RDD[Double]` containing the sample of data to test + * @param cdf a `Double = Double` function to calculate the theoretical CDF at a given value + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = { +KSTest.testOneSample(data, cdf) + } + + /** + * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality, + * which creates only 1 distribution object per partition (useful in conjunction with Apache + * Commons Math distributions) + * @param data an `RDD[Double]` containing the sample of data to test + * @param distCalc a `Iterator[(Double, Double, Double)] = Iterator[Double]` function, to + * calculate the distance between empirical values and theoretical values of + * a distribution. The first element corresponds to the value x, the second + * element is the lower bound of the empirical CDF, while the third element is + * the upper bound. Thus if we call triple associated with an observation T, the + * KS distance at that point is max(Pr[X = T._1] - T._2, T._3 - Pr[X = T._1]) + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTestOpt(data: RDD[Double], --- End diff -- I think I'd leave this API out on the first pass, as, while there are definitely situations where it's useful, it's likely to be confusing to users. We can always add it in later if there's demand. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33195453 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] trait ContainerPlacementStrategy { + type Locality = Array[String] --- End diff -- If I understand correctly, a pair of localities is used for a single ContainerRequest. If this is the case, it might be most clear to remove the locality type, and replace it with a ContainerLocalityPreferences class that has two fields: `nodes: Array[String]` and `racks: Array[String]`. Then `localityOfRequestedContainers` could just return an Array of these. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r33196367 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] trait ContainerPlacementStrategy { + type Locality = Array[String] + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers(numContainer: Int, numLocalityAwarePendingTasks: Int, + preferredLocalityToCount: Map[String, Int]): (Array[Locality], Array[Locality]) +} + +/** + * This strategy calculates the preferred localities by considering the node ratio of pending --- End diff -- Which I think is something like: If YARN were to give us containers that satisfied our locality preferences, and we were to fill them up with tasks until all tasks completed, we're trying to maximize the number of tasks that would run locally. (this statement maybe needs to be revised to account for already-running containers and already-running tasks) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6994#issuecomment-115113205 jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33224064 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param data `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + */ + def empirical(data: RDD[Double]): RDD[(Double, Double, Double)] = { +val n = data.count().toDouble +data.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 1) / n) } + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param dat `RDD[Double]` to evaluate + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult = { +val empiriRDD = empirical(dat) // empirical distribution +val distances = empiriRDD.map { +case (v, dl, dp) = + val cdfVal = cdf(v) + Math.max(cdfVal - dl, dp - cdfVal) + } +val ksStat = distances.max() +evalOneSampleP(ksStat, distances.count()) --- End diff -- Can we reuse the count that was computed in `empirical`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33223955 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ +private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param data `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + */ + def empirical(data: RDD[Double]): RDD[(Double, Double, Double)] = { +val n = data.count().toDouble +data.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 1) / n) } + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param dat `RDD[Double]` to evaluate + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult = { --- End diff -- nit: data instead of dat --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8402][MLLIB] DP Means Clustering
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6880#discussion_r33174171 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/DpMeansModel.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.pmml.PMMLExportable +import org.apache.spark.mllib.util.{Loader, Saveable} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SQLContext} + +/** + * A clustering model for DP means. Each point belongs to the cluster with the closest center. + */ +class DpMeansModel --- End diff -- I don't know if there are plans, just thought it might be a good idea now that three's a crowd. Probably best to wait for @mengxr or @jkbradley to weigh in before making changes. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33188946 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} --- End diff -- Alphabetize the imports in here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33188987 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ + private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param dat `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + * + */ + def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = { --- End diff -- Nit: replace dat with data --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189226 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ + private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param dat `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + * --- End diff -- extra newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189172 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ + private[stat] object KSTest { --- End diff -- indent this back two spaces --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189336 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ + private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param dat `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + * + */ + def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = { +val n = dat.count().toDouble +dat.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 1) / n) } + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param dat `RDD[Double]` to evaluate + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult = { +val empiriRDD = empirical(dat) // empirical distribution +val distances = empiriRDD.map { +case (v, dl, dp) = + val cdfVal = cdf(v) + Math.max(cdfVal - dl, dp - cdfVal) + } +val ksStat = distances.max() +evalOneSampleP(ksStat, distances.count()) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution. Optimized + * such that each partition runs a separate mapping operation. This can help in cases where the + * CDF calculation involves creating an object. By using this implementation we can make sure + * only 1 object is created per partition, versus 1 per observation. + * @param dat `RDD[Double]` to evaluate + * @param distCalc a function to calculate the distance between the empirical values and the + * theoretical value + * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSampleOpt(dat: RDD[Double], + distCalc: Iterator[(Double, Double, Double)] = Iterator[Double]) --- End diff -- `distCalc` should be indented four spaces past `def` and `: KSTestResult` should be indented two spaces past `def` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189576 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -153,4 +157,61 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { Statistics.chiSqTest(sc.parallelize(continuousFeature, 2)) } } + + test(kolmogorov smirnov test empirical distributions) { + --- End diff -- unnecessary newline --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189552 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -19,6 +19,10 @@ package org.apache.spark.mllib.stat import java.util.Random +import org.apache.commons.math3.distribution.{ExponentialDistribution, +NormalDistribution, UniformRealDistribution} --- End diff -- Indent two spaces forward --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33189747 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -158,4 +158,44 @@ object Statistics { def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = { ChiSqTest.chiSquaredFeatures(data) } + + /** + * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality + * @param data an `RDD[Double]` containing the sample of data to test + * @param cdf a `Double = Double` function to calculate the theoretical CDF at a given value + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = { +KSTest.testOneSample(data, cdf) + } + + /** + * Conduct a one-sample, two sided Kolmogorov Smirnov test for probability distribution equality, + * which creates only 1 distribution object per partition (useful in conjunction with Apache + * Commons Math distributions) + * @param data an `RDD[Double]` containing the sample of data to test + * @param distCalc a `Iterator[(Double, Double, Double)] = Iterator[Double]` function, to + * calculate the distance between empirical values and theoretical values of + * a distribution. The first element corresponds to the value x, the second + * element is the lower bound of the empirical CDF, while the third element is + * the upper bound. Thus if we call triple associated with an observation T, the + * KS distance at that point is max(Pr[X = T._1] - T._2, T._3 - Pr[X = T._1]) + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTestOpt(data: RDD[Double], + distCalc: Iterator[(Double, Double, Double)] = Iterator[Double]) + : KSTestResult = { +KSTest.testOneSampleOpt(data, distCalc) + } + + /** + * A convenience method to conduct a one-sample, two sided Kolmogorov Smirnov test for probability + * distribution equality + * @param data an `RDD[Double]` containing the sample of data to test + * @param name a `String` name for a theoretical distribution + * @return KSTestResult object containing test statistic, p-value, and null hypothesis. + */ + def ksTest(data: RDD[Double], name: String): KSTestResult = { --- End diff -- Put this up below the other `ksTest` method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6994#discussion_r33190190 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala --- @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import org.apache.commons.math3.distribution.NormalDistribution +import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest + +import org.apache.spark.{SparkException, Logging} +import org.apache.spark.rdd.RDD + + +/** + * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a + * continuous distribution. By comparing the largest difference between the empirical cumulative + * distribution of the sample data and the theoretical distribution we can provide a test for the + * the null hypothesis that the sample data comes from that theoretical distribution. + * For more information on KS Test: https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test + */ + private[stat] object KSTest { + + // Null hypothesis for the type of KS test to be included in the result. + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val oneSampleTwoSided = Value(Sample follows theoretical distribution.) + } + + /** + * Calculate empirical cumulative distribution values needed for KS statistic + * @param dat `RDD[Double]` on which to calculate empirical cumulative distribution values + * @return and RDD of (Double, Double, Double), where the first element in each tuple is the + * value, the second element is the ECDFV - 1 /n, and the third element is the ECDFV, + * where ECDF stands for empirical cumulative distribution function value + * + */ + def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = { +val n = dat.count().toDouble +dat.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 1) / n) } + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution + * @param dat `RDD[Double]` to evaluate + * @param cdf `Double = Double` function to calculate the theoretical CDF + * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult = { +val empiriRDD = empirical(dat) // empirical distribution +val distances = empiriRDD.map { +case (v, dl, dp) = + val cdfVal = cdf(v) + Math.max(cdfVal - dl, dp - cdfVal) + } +val ksStat = distances.max() +evalOneSampleP(ksStat, distances.count()) + } + + /** + * Runs a KS test for 1 set of sample data, comparing it to a theoretical distribution. Optimized + * such that each partition runs a separate mapping operation. This can help in cases where the + * CDF calculation involves creating an object. By using this implementation we can make sure + * only 1 object is created per partition, versus 1 per observation. + * @param dat `RDD[Double]` to evaluate + * @param distCalc a function to calculate the distance between the empirical values and the + * theoretical value + * @return a KSTestResult summarizing the test results (pval, statistic, and null hypothesis) + */ + def testOneSampleOpt(dat: RDD[Double], + distCalc: Iterator[(Double, Double, Double)] = Iterator[Double]) + : KSTestResult = { +val empiriRDD = empirical(dat) // empirical distribution information +val distances = empiriRDD.mapPartitions(distCalc, false) +val ksStat = distances.max +evalOneSampleP(ksStat, distances.count()) + } + + /** + * Returns a function to calculate the KSTest with a standard normal distribution + * to be used with testOneSampleOpt
[GitHub] spark pull request: [SPARK-8402][MLLIB] DP Means Clustering
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6880#discussion_r32793919 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/clustering/DpMeansModel.scala --- @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.clustering + +import scala.collection.JavaConverters._ +import scala.collection.mutable + +import org.json4s.DefaultFormats +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.pmml.PMMLExportable +import org.apache.spark.mllib.util.{Loader, Saveable} +import org.apache.spark.rdd.RDD +import org.apache.spark.SparkContext +import org.apache.spark.sql.{Row, SQLContext} + +/** + * A clustering model for DP means. Each point belongs to the cluster with the closest center. + */ +class DpMeansModel --- End diff -- Is there any difference between this and a `KMeansModel`? Might we be able to consolidate them into something like a `ClusterCentersModel`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r32776975 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] trait ContainerPlacementStrategy { + type Locality = Array[String] + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers(numContainer: Int): (Array[Locality], Array[Locality]) --- End diff -- Can we put all the information that's used to determine the locality requests as arguments to this function? It will make it clearer for later readers to understand what's used in this calculation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-113322897 @JoshRosen this should be ready for merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6394#issuecomment-112981135 Hi @jerryshao is this ready for review? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-110994319 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-110566851 @shivaram @kayousterhout this approach addresses my concerns. Thanks for updating! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-110571366 I definitely don't think we rely on it in Spark. On Cloudera setups, as well as presumably Hortonworks and MapR setups, client configurations are synchronized globally across nodes, so this discrepancy couldn't occur. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r31962024 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -438,6 +519,15 @@ private[yarn] class YarnAllocator( amClient.releaseAssignedContainer(container.getId()) } + private def rackLookUp(host: String, conf: Configuration): String = { +hostToRackNameCache.get(host) match { --- End diff -- Have you noticed a need for this caching? RackResolver should be doing its own caching, so this shoudn't be needed. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r31973076 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -225,12 +243,74 @@ private[yarn] class YarnAllocator( logInfo(sWill request $missing executor containers, each with ${resource.getVirtualCores} + scores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead) - for (i - 0 until missing) { -val request = createContainerRequest(resource) -amClient.addContainerRequest(request) -val nodes = request.getNodes -val hostStr = if (nodes == null || nodes.isEmpty) Any else nodes.last -logInfo(sContainer request (host: $hostStr, capability: $resource)) + // Calculated the number of executors we expected to satisfy all the preferred locality tasks --- End diff -- One possible issue with the approach here is that our locality preferences might shift when a stage completes and a new stage comes in. If we have a set of outstanding requests, it would be nice if we could cancel requests for the locations that we no longer need and place our requests on nodes that we now care about. One way we could achieve this is, each time we call `updateResourceRequsts` remove all the unsatisfied container requests from amClient and resubmit requests based on our current needs. I.e., after removal, missing would always be `targetNumExecutors - numExecutorsRunning`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r31972804 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -225,12 +243,74 @@ private[yarn] class YarnAllocator( logInfo(sWill request $missing executor containers, each with ${resource.getVirtualCores} + scores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead) - for (i - 0 until missing) { -val request = createContainerRequest(resource) -amClient.addContainerRequest(request) -val nodes = request.getNodes -val hostStr = if (nodes == null || nodes.isEmpty) Any else nodes.last -logInfo(sContainer request (host: $hostStr, capability: $resource)) + // Calculated the number of executors we expected to satisfy all the preferred locality tasks + val localityAwareTaskCores = localityAwarePendingTaskNum * CPUS_PER_TASK + val expectedLocalityAwareContainerNum = +(localityAwareTaskCores + resource.getVirtualCores - 1) / resource.getVirtualCores + + // Get the all the existed and locality matched containers + val existedMatchedContainers = allocatedHostToContainersMap.filter { case (host, _) = +preferredLocalityToCounts.contains(host) + } + val existedMatchedContainerNum = existedMatchedContainers.values.map(_.size).sum + + // The number of containers to allocate, divided into two groups, one with node locality, + // and the other without locality preference. + var requiredLocalityFreeContainerNum: Int = 0 + var requiredLocalityAwareContainerNum: Int = 0 + + if (expectedLocalityAwareContainerNum = existedMatchedContainerNum) { +// If the current allocated executor can satisfy all the locality preferred tasks, --- End diff -- This is a little weird to me. IIUC, what we're saying here is: * Find all the containers from all the nodes that have at least one task that would be happy to be there. * If, taken together, these containers have enough capacity to run all pending tasks with locality preferences, none of the executor requests we submit need to have locality preferences. This would result in sub-optimal behavior in the following situation: * We have 48 tasks that have locality preferences distributed across a wide number of nodes, including one task that can run on either node 1 or node 2. * Node 1 and node 2 each have 6 executors with 4 cores each. In this situation, we'd end up giving up on locality-based requests, even though it would make sense to request executors on some of the nodes that the tasks want to be on. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-109890835 Ah, yeah, that was the change I was referring to. I'm not sure about the Mesos deployment model, but on Standalone mode at least, it would be possible to manufacture a situation where the behavior changes. That situation is the Example 2 outlined above. This difference relies on config files with different client configs being distributed to the client and to the cluster nodes. I would argue that changing behavior in this way is OK basically because the current behavior is wrong. That is, client configs should come from the client and not from the cluster. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8099] set executor cores into system in...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6643#issuecomment-109383319 LGTM, merging --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-109397104 When there is no skew, are there situations where this would lead to worse performance? E.g. will it make tasks bunch up on nodes more than before and / or result in scheduling delays? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-109398051 Also, is there an intuitive justification for why 5 is a good number? It seems a little weird to me that it's independent of the number of tasks, the cluster size, and the amount of skew. Have you considered a rule like if X% of the data falls on Y% of the nodes, mark those as preferred, where X and Y could be 80/20 or 70/30? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-7699][Core] Lazy start the scheduler fo...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6430#issuecomment-109395777 This looks right to me. Merging. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-109450997 Ahhh, I misunderstood and thought that all the reduce tasks for a stage got the same locality preferences. I withdraw my concern. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-109445990 On the other side, I can also envision situations where it would be really helpful to request a higher number of preferred locations. Consider a large YARN cluster where an app gets executors on a small subset of the nodes. The app is idle for a while after running an initial job, so the initial set of executors go away. The app wakes up later and wants to use the output of its first stage for another job. If the scheduler could request tasks on the nodes housing the outputs of the first stage, with SPARK-4352, the ExecutorAllocationManager would be able to request executors on these nodes. This probably makes sense as later work, but still thought it could be worth bringing up as part of the discussion. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6652#issuecomment-109444936 My worry is the following situation: * Map outputs from stage 1 exhibit little skew and are distributed evenly across nodes. * 5 nodes are chosen as preferred for stage 2. The choice is basically random because of the lack of skew. * Stage 2 has fewer tasks than the number of slots. * Instead of stage 2's tasks evenly distributed across the cluster, all of them end up on the 5 nodes, which are more likely to become bottlenecked on incoming network traffic. This isn't a disaster situation by any means. But it also seems like it could be easily avoided with a policy that only prefers locations when it would make a difference. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-109493444 Where do we end up cloning Configuration objects? With these changes, we avoid loading defaults when we reconstitute Configuration objects from bytes. Are there hot paths where we create Configuration objects from other live Configuration objects? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. In SerializableWritable, don't loa...
GitHub user sryza opened a pull request: https://github.com/apache/spark/pull/6679 SPARK-8135. In SerializableWritable, don't load defaults when instant⦠â¦iating Configuration You can merge this pull request into a Git repository by running: $ git pull https://github.com/sryza/spark sandy-spark-8135 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/6679.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6679 commit ca6554357e951807bb9bbb8c2a647b74d6954d68 Author: Sandy Ryza sa...@cloudera.com Date: 2015-06-05T23:28:45Z SPARK-8135. In SerializableWritable, don't load defaults when instantiating Configuration --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8136][YARN] Fix flakiness in YarnCluste...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6680#issuecomment-109502268 Does this comment still apply: `// If we are running in yarn-cluster mode, verify that driver logs are downloadable.`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-109502593 Makes sense. In that case, this should be ready for review. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-109499883 In light of this change, do you think we should remove the broadcasting of Configurations? While we avoid the much larger cost of reading and parsing XML for each task, we would still pay the cost of turning bytes into Configuration objects. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6679#issuecomment-109499640 There's a situation in which there could be a behavior change in situations where the executor somehow has a different Hadoop configuration file than the driver. But I think it's the right change. I started to explain this stuff abstractly, but I think it might be easier to just put down some examples: **Example 1** core-site.xml on the driver contains optionA-value1 core-site.xml on the executor contains optionA-value2 Old behavior: on the executor, conf.get(optionA) returns value1 New behavior: same as old behavior **Example 2** core-site.xml on the driver does not contain optionA core-site.xml on the executor contains optionA-value1 Old behavior: on the executor, conf.get(optionA) returns value1 New behavior: on the executor, conf.get(optionA) returns null I can't find the JIRA, but I believe there was a recent change by @vanzin that made it so that the executor would use a copy of the Hadoop configuration files used on the driver. When that is the case, neither example 1 nor example 2 can occur. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org