[GitHub] spark pull request: [SPARK-8137][core] Improve treeAggregate to co...

2015-07-17 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7461#issuecomment-122347375
  
Small thing, but can the title of this PR be changed to ...combine all 
data on *each* executor... to clarify that all the data isn't going to a 
single executor?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] [WIP] Implementation of a...

2015-07-13 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7075#issuecomment-121080323
  
Hey @josepablocam can you rebase this on current master?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8958] Dynamic allocation: change cached...

2015-07-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7329#issuecomment-120138836
  
LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] [WIP] 1-sample Anderson-D...

2015-07-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7278#issuecomment-120179329
  
It's also worth mentioning that it's implemented in the common stats 
packages for scipy:

http://docs.scipy.org/doc/scipy-0.15.1/reference/generated/scipy.stats.anderson.html

and Matlab:
http://www.mathworks.com/help/stats/adtest.html

and SAS:

http://support.sas.com/documentation/cdl/en/procstat/63104/HTML/default/viewer.htm#procstat_univariate_sect037.htm


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r34200738
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(1 sample Kolmogorov-Smirnov test) {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+val unifDist = new UniformRealDistribution()
+
+// set seeds
+val seed = 10L
+stdNormalDist.reseedRandomGenerator(seed)
+expDist.reseedRandomGenerator(seed)
+unifDist.reseedRandomGenerator(seed)
+
+// Sample data from the distributions and parallelize it
+val n = 10
+val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
+val sampledExp = sc.parallelize(expDist.sample(n), 10)
+val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
+
+// Use a apache math commons local KS test to verify calculations
+val ksTest = new KolmogorovSmirnovTest()
+val pThreshold = 0.05
+
+// Comparing a standard normal sample to a standard normal distribution
+val result1 = Statistics.ksTest(sampledNorm, stdnorm)
+val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledNorm.collect())
+val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+// Verify vs apache math commons ks test
+assert(result1.statistic === referenceStat1)
+assert(result1.pValue === referencePVal1)
+// Cannot reject null hypothesis
+assert(result1.pValue  pThreshold)
+
+// Comparing an exponential sample to a standard normal distribution
+val result2 = Statistics.ksTest(sampledExp, stdnorm)
+val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledExp.collect())
+val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
+// verify vs apache math commons ks test
+assert(result2.statistic === referenceStat2)
+assert(result2.pValue === referencePVal2)
+// reject null hypothesis
+assert(result2.pValue  pThreshold)
+
+// Testing the use of a user provided CDF function
+// Distribution is not serializable, so will have to create in the 
lambda
+val expCDF = (x: Double) = new 
ExponentialDistribution(0.2).cumulativeProbability(x)
+
+// Comparing an exponential sample with mean X to an exponential 
distribution with mean Y
+// Where X != Y
+val result3 = Statistics.ksTest(sampledExp, expCDF)
+val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new 
ExponentialDistribution(0.2),
+  sampledExp.collect())
+val referencePVal3 = 1 - ksTest.cdf(referenceStat3, 
sampledNorm.count().toInt)
+// verify vs apache math commons ks test
+assert(result3.statistic === referenceStat3)
+assert(result3.pValue === referencePVal3)
+// reject null hypothesis
+assert(result3.pValue  pThreshold)
+
+/*
+ Comparing results with R's implementation of Kolmogorov-Smirnov for 1 
sample
+  sessionInfo()
+ R version 3.2.0 (2015-04-16)
+ Platform: x86_64-apple-darwin13.4.0 (64-bit)
+  set.seed(20)
+  v - rnorm(20)
+  v
+  [1]  1.16268529 -0.58592447  1.78546500 -1.33259371 -0.44656677  
0.56960612
+  [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 
-0.15038222
+ [13] -0.62812676  1.32322085 -1.52135057 -0.43742787  0.97057758  
0.02822264
+ [19] -0.08578219  0.38921440
+  ks.test(v, pnorm, alternative = two.sided)
+
+ One-sample Kolmogorov-Smirnov test
+
+ data:  v
+ D = 0.18874, p-value = 0.4223
+ alternative hypothesis: two-sided
+*/
+
+val RKSStat = 0.18874
+val RKSPVal = 0.4223
+val RData = sc.parallelize(
+Array(
+  1.1626852897838, -0.585924465893051, 1.78546500331661, 
-1.33259371048501,
+  -0.446566766553219, 0.569606122374976, -2.88971761441412, 
-0.869018343326555,
+  -0.461702683149641, -0.40910137444, -0.0201353678515895, 
-0.150382224136063,
+  -0.628126755843964, 1.32322085193283, -1.52135057001199, 
-0.437427868856691,
+  0.970577579543399, 0.0282226444247749, -0.0857821886527593, 
0.389214404984942
+)
+)
+val RCompResult = Statistics.ksTest(RData, stdnorm)
+assert(RCompResult.statistic ~== RKSStat relTol 1e-4)
+assert(RCompResult.pValue ~== RKSPVal relTol 1e-4)
+
--- End diff

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r34201295
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala ---
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no ties occur (the presence of ties can affect the 
validity of the test).
+ * The AD test provides an alternative to the Kolmogorov-Smirnov test. 
Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against.
+ * The  AD statistic is defined as -n - s/n, where
+ * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i})
+ * where z_i is the CDF value of the ith observation in the sorted sample.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object ADTest {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * ADTheoreticalDist is a trait that every distribution used in an AD 
test must extend.
+   * The rationale for this is that the AD test has distribution-dependent 
critical values, and by
+   * requiring extension of this trait we guarantee that future additional 
distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait ADTheoreticalDist {
+val params: Array[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Sourced from
+   * 
http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf
+   * 
https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017
+   */
+
+  // Exponential distribution
+  class ADExponential(val params: Array[Double]) extends ADTheoreticalDist 
{
+private val theoretical = new ExponentialDistribution(params(0))
+
+private val rawCVs = ListMap(
+  0.15 - 0.922, 0.10 - 1.078,
+  0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)}
+}
+  }
+
+  // Normal Distribution
+  class ADNormal(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new NormalDistribution(params(0), params(1))
+
+private val rawCVs = ListMap(
+  0.15 - 0.576, 0.10 - 0.656,
+  0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n 
* n)) }
+}
+  }
+
+  // Gumbel distribution
+  class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new GumbelDistribution(params(0), params(1

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r34200807
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(1 sample Kolmogorov-Smirnov test) {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+val unifDist = new UniformRealDistribution()
+
+// set seeds
+val seed = 10L
+stdNormalDist.reseedRandomGenerator(seed)
+expDist.reseedRandomGenerator(seed)
+unifDist.reseedRandomGenerator(seed)
+
+// Sample data from the distributions and parallelize it
+val n = 10
+val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
+val sampledExp = sc.parallelize(expDist.sample(n), 10)
+val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
+
+// Use a apache math commons local KS test to verify calculations
+val ksTest = new KolmogorovSmirnovTest()
+val pThreshold = 0.05
+
+// Comparing a standard normal sample to a standard normal distribution
+val result1 = Statistics.ksTest(sampledNorm, stdnorm)
+val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledNorm.collect())
+val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+// Verify vs apache math commons ks test
+assert(result1.statistic === referenceStat1)
+assert(result1.pValue === referencePVal1)
+// Cannot reject null hypothesis
+assert(result1.pValue  pThreshold)
+
+// Comparing an exponential sample to a standard normal distribution
+val result2 = Statistics.ksTest(sampledExp, stdnorm)
+val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledExp.collect())
+val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
+// verify vs apache math commons ks test
+assert(result2.statistic === referenceStat2)
+assert(result2.pValue === referencePVal2)
+// reject null hypothesis
+assert(result2.pValue  pThreshold)
+
+// Testing the use of a user provided CDF function
+// Distribution is not serializable, so will have to create in the 
lambda
+val expCDF = (x: Double) = new 
ExponentialDistribution(0.2).cumulativeProbability(x)
+
+// Comparing an exponential sample with mean X to an exponential 
distribution with mean Y
+// Where X != Y
+val result3 = Statistics.ksTest(sampledExp, expCDF)
+val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new 
ExponentialDistribution(0.2),
+  sampledExp.collect())
+val referencePVal3 = 1 - ksTest.cdf(referenceStat3, 
sampledNorm.count().toInt)
+// verify vs apache math commons ks test
+assert(result3.statistic === referenceStat3)
+assert(result3.pValue === referencePVal3)
+// reject null hypothesis
+assert(result3.pValue  pThreshold)
+
+/*
+ Comparing results with R's implementation of Kolmogorov-Smirnov for 1 
sample
--- End diff --

Indent this block an extra space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r34200842
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(1 sample Kolmogorov-Smirnov test) {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+val unifDist = new UniformRealDistribution()
+
+// set seeds
+val seed = 10L
+stdNormalDist.reseedRandomGenerator(seed)
+expDist.reseedRandomGenerator(seed)
+unifDist.reseedRandomGenerator(seed)
+
+// Sample data from the distributions and parallelize it
+val n = 10
+val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
+val sampledExp = sc.parallelize(expDist.sample(n), 10)
+val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
+
+// Use a apache math commons local KS test to verify calculations
+val ksTest = new KolmogorovSmirnovTest()
+val pThreshold = 0.05
+
+// Comparing a standard normal sample to a standard normal distribution
+val result1 = Statistics.ksTest(sampledNorm, stdnorm)
+val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledNorm.collect())
+val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+// Verify vs apache math commons ks test
+assert(result1.statistic === referenceStat1)
+assert(result1.pValue === referencePVal1)
+// Cannot reject null hypothesis
+assert(result1.pValue  pThreshold)
+
+// Comparing an exponential sample to a standard normal distribution
+val result2 = Statistics.ksTest(sampledExp, stdnorm)
+val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledExp.collect())
+val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
+// verify vs apache math commons ks test
+assert(result2.statistic === referenceStat2)
+assert(result2.pValue === referencePVal2)
+// reject null hypothesis
+assert(result2.pValue  pThreshold)
+
+// Testing the use of a user provided CDF function
+// Distribution is not serializable, so will have to create in the 
lambda
+val expCDF = (x: Double) = new 
ExponentialDistribution(0.2).cumulativeProbability(x)
+
+// Comparing an exponential sample with mean X to an exponential 
distribution with mean Y
+// Where X != Y
+val result3 = Statistics.ksTest(sampledExp, expCDF)
+val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new 
ExponentialDistribution(0.2),
+  sampledExp.collect())
+val referencePVal3 = 1 - ksTest.cdf(referenceStat3, 
sampledNorm.count().toInt)
+// verify vs apache math commons ks test
+assert(result3.statistic === referenceStat3)
+assert(result3.pValue === referencePVal3)
+// reject null hypothesis
+assert(result3.pValue  pThreshold)
+
+/*
+ Comparing results with R's implementation of Kolmogorov-Smirnov for 1 
sample
+  sessionInfo()
+ R version 3.2.0 (2015-04-16)
+ Platform: x86_64-apple-darwin13.4.0 (64-bit)
+  set.seed(20)
+  v - rnorm(20)
+  v
+  [1]  1.16268529 -0.58592447  1.78546500 -1.33259371 -0.44656677  
0.56960612
+  [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 
-0.15038222
+ [13] -0.62812676  1.32322085 -1.52135057 -0.43742787  0.97057758  
0.02822264
+ [19] -0.08578219  0.38921440
+  ks.test(v, pnorm, alternative = two.sided)
+
+ One-sample Kolmogorov-Smirnov test
+
+ data:  v
+ D = 0.18874, p-value = 0.4223
+ alternative hypothesis: two-sided
+*/
+
+val RKSStat = 0.18874
--- End diff --

lower case variable names


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r34200828
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +157,101 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(1 sample Kolmogorov-Smirnov test) {
+// Create theoretical distributions
+val stdNormalDist = new NormalDistribution(0, 1)
+val expDist = new ExponentialDistribution(0.6)
+val unifDist = new UniformRealDistribution()
+
+// set seeds
+val seed = 10L
+stdNormalDist.reseedRandomGenerator(seed)
+expDist.reseedRandomGenerator(seed)
+unifDist.reseedRandomGenerator(seed)
+
+// Sample data from the distributions and parallelize it
+val n = 10
+val sampledNorm = sc.parallelize(stdNormalDist.sample(n), 10)
+val sampledExp = sc.parallelize(expDist.sample(n), 10)
+val sampledUnif = sc.parallelize(unifDist.sample(n), 10)
+
+// Use a apache math commons local KS test to verify calculations
+val ksTest = new KolmogorovSmirnovTest()
+val pThreshold = 0.05
+
+// Comparing a standard normal sample to a standard normal distribution
+val result1 = Statistics.ksTest(sampledNorm, stdnorm)
+val referenceStat1 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledNorm.collect())
+val referencePVal1 = 1 - ksTest.cdf(referenceStat1, n)
+// Verify vs apache math commons ks test
+assert(result1.statistic === referenceStat1)
+assert(result1.pValue === referencePVal1)
+// Cannot reject null hypothesis
+assert(result1.pValue  pThreshold)
+
+// Comparing an exponential sample to a standard normal distribution
+val result2 = Statistics.ksTest(sampledExp, stdnorm)
+val referenceStat2 = ksTest.kolmogorovSmirnovStatistic(stdNormalDist, 
sampledExp.collect())
+val referencePVal2 = 1 - ksTest.cdf(referenceStat2, n)
+// verify vs apache math commons ks test
+assert(result2.statistic === referenceStat2)
+assert(result2.pValue === referencePVal2)
+// reject null hypothesis
+assert(result2.pValue  pThreshold)
+
+// Testing the use of a user provided CDF function
+// Distribution is not serializable, so will have to create in the 
lambda
+val expCDF = (x: Double) = new 
ExponentialDistribution(0.2).cumulativeProbability(x)
+
+// Comparing an exponential sample with mean X to an exponential 
distribution with mean Y
+// Where X != Y
+val result3 = Statistics.ksTest(sampledExp, expCDF)
+val referenceStat3 = ksTest.kolmogorovSmirnovStatistic(new 
ExponentialDistribution(0.2),
+  sampledExp.collect())
+val referencePVal3 = 1 - ksTest.cdf(referenceStat3, 
sampledNorm.count().toInt)
+// verify vs apache math commons ks test
+assert(result3.statistic === referenceStat3)
+assert(result3.pValue === referencePVal3)
+// reject null hypothesis
+assert(result3.pValue  pThreshold)
+
+/*
+ Comparing results with R's implementation of Kolmogorov-Smirnov for 1 
sample
+  sessionInfo()
+ R version 3.2.0 (2015-04-16)
+ Platform: x86_64-apple-darwin13.4.0 (64-bit)
+  set.seed(20)
+  v - rnorm(20)
+  v
+  [1]  1.16268529 -0.58592447  1.78546500 -1.33259371 -0.44656677  
0.56960612
+  [7] -2.88971761 -0.86901834 -0.46170268 -0.4091 -0.02013537 
-0.15038222
+ [13] -0.62812676  1.32322085 -1.52135057 -0.43742787  0.97057758  
0.02822264
+ [19] -0.08578219  0.38921440
+  ks.test(v, pnorm, alternative = two.sided)
+
+ One-sample Kolmogorov-Smirnov test
+
+ data:  v
+ D = 0.18874, p-value = 0.4223
+ alternative hypothesis: two-sided
+*/
+
+val RKSStat = 0.18874
+val RKSPVal = 0.4223
+val RData = sc.parallelize(
+Array(
--- End diff --

indent this back a space


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r34196348
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +153,166 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(Anderson-Darling test for 1 sample) {
+/*
+  Data generated with R
+   sessionInfo() #truncated
+R version 3.2.0 (2015-04-16)
+Platform: x86_64-apple-darwin13.4.0 (64-bit)
+Running under: OS X 10.10.2 (Yosemite)
+   set.seed(10)
+   dataNorm - rnorm(20)
+   dataExp - rexp(20)
+   dataUnif - runif(20)
+   mean(dataNorm)
+  [1] -0.06053267
+   sd(dataNorm)
+  [1] 0.7999093
+   mean(dataExp)
+  [1] 1.044636
+   sd(dataExp)
+  [1] 0.96727
+   mean(dataUnif)
+  [1] 0.4420219
+   sd(dataUnif)
+  [1] 0.2593285
+ */
+
+val dataNorm = sc.parallelize(
+  Array(0.0187461709418264, -0.184252542069064, -1.37133054992251,
+  -0.599167715783718, 0.294545126567508, 0.389794300700167, 
-1.20807617542949,
+  -0.363676017470862, -1.62667268170309, -0.256478394123992, 
1.10177950308713,
+  0.755781508027337, -0.238233556018718, 0.98744470341339, 
0.741390128383824,
+  0.0893472664958216, -0.954943856152377, -0.195150384667239, 
0.92552126209408,
+  0.482978524836611)
+)
+val dataExp = sc.parallelize(
+  Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 
1.11045944034578,
+0.170421596598791, 1.91878133072498, 0.166443939786404, 
0.97028998914142, 0.010571192484349,
+2.79300971312409, 2.35461177957702, 0.667238388210535, 
0.522243486717343, 0.146712897811085,
+0.751234306178963, 2.2885662248, 0.0688535687513649, 
0.282713153399527,
+0.0514786350540817, 3.02959313971882)
+)
+val dataUnif = sc.parallelize(
+  Array(0.545859839767218, 0.372763097286224, 0.961302414536476, 
0.257341569056734,
+0.207951683318242, 0.861382439732552, 0.464391982648522, 
0.222867433447391,
+0.623549601528794, 0.203647700604051, 0.0196734135970473, 
0.797993005951867,
+0.274318896699697, 0.166609104024246, 0.170151718193665, 
0.4885059366934,
+0.711409077281132, 0.591934921452776, 0.517156876856461, 
0.381627685856074)
+)
+
+/* normality test in R
+ library(nortest)
--- End diff --

Can you indent inside this comment block a little further


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r34196274
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +153,166 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(Anderson-Darling test for 1 sample) {
+/*
+  Data generated with R
+   sessionInfo() #truncated
+R version 3.2.0 (2015-04-16)
+Platform: x86_64-apple-darwin13.4.0 (64-bit)
+Running under: OS X 10.10.2 (Yosemite)
+   set.seed(10)
+   dataNorm - rnorm(20)
+   dataExp - rexp(20)
+   dataUnif - runif(20)
+   mean(dataNorm)
+  [1] -0.06053267
+   sd(dataNorm)
+  [1] 0.7999093
+   mean(dataExp)
+  [1] 1.044636
+   sd(dataExp)
+  [1] 0.96727
+   mean(dataUnif)
+  [1] 0.4420219
+   sd(dataUnif)
+  [1] 0.2593285
+ */
+
+val dataNorm = sc.parallelize(
+  Array(0.0187461709418264, -0.184252542069064, -1.37133054992251,
+  -0.599167715783718, 0.294545126567508, 0.389794300700167, 
-1.20807617542949,
+  -0.363676017470862, -1.62667268170309, -0.256478394123992, 
1.10177950308713,
+  0.755781508027337, -0.238233556018718, 0.98744470341339, 
0.741390128383824,
+  0.0893472664958216, -0.954943856152377, -0.195150384667239, 
0.92552126209408,
+  0.482978524836611)
+)
+val dataExp = sc.parallelize(
+  Array(0.795082630547595, 1.39629918233218, 1.39810742601556, 
1.11045944034578,
+0.170421596598791, 1.91878133072498, 0.166443939786404, 
0.97028998914142, 0.010571192484349,
+2.79300971312409, 2.35461177957702, 0.667238388210535, 
0.522243486717343, 0.146712897811085,
+0.751234306178963, 2.2885662248, 0.0688535687513649, 
0.282713153399527,
+0.0514786350540817, 3.02959313971882)
+)
+val dataUnif = sc.parallelize(
+  Array(0.545859839767218, 0.372763097286224, 0.961302414536476, 
0.257341569056734,
+0.207951683318242, 0.861382439732552, 0.464391982648522, 
0.222867433447391,
+0.623549601528794, 0.203647700604051, 0.0196734135970473, 
0.797993005951867,
+0.274318896699697, 0.166609104024246, 0.170151718193665, 
0.4885059366934,
+0.711409077281132, 0.591934921452776, 0.517156876856461, 
0.381627685856074)
+)
+
+/* normality test in R
+ library(nortest)
+ ad.test(datNorm)
+
+Anderson-Darling normality test
+
+data:  datNorm
+A = 0.27523, p-value = 0.6216
+
+ ad.test(datExp)
+
+Anderson-Darling normality test
+
+data:  datExp
+A = 0.79034, p-value = 0.03336
+
+ ad.test(datUnif)
+
+Anderson-Darling normality test
+
+data:  datUnif
+A = 0.31831, p-value = 0.5114
+
+*/
+
+val RNormADStats = Map(norm - 0.27523, exp - 0.79034, unif - 
0.31831)
+val RParams = Map(
+  norm -(-0.06053267, 0.7999093),
+  exp -(1.044636, 0.96727),
+  unif -(0.4420219, 0.2593285)
+)
+
+
+val ADTestNormNorm = ADTest.testOneSample(dataNorm, norm)
+assert(ADTestNormNorm.statistic ~== RNormADStats(norm) relTol 1e-4)
+val ADTestExpNorm = Statistics.andersonDarlingTest(dataExp, norm)
+assert(ADTestExpNorm.statistic ~== RNormADStats(exp) relTol 1e-4)
+val ADTestUnifNorm = Statistics.andersonDarlingTest(dataUnif, norm)
+assert(ADTestUnifNorm.statistic ~== RNormADStats(unif) relTol 1e-4)
+
+// Testing passing in parameters estimated in R vs estimating directly
+assert(
+  Statistics.andersonDarlingTest(
+dataNorm,
+norm,
+RParams(norm)._1,
+RParams(norm)._2
+  ).statistic
+  ~== ADTestNormNorm.statistic relTol 1e-4
+)
+assert(
+  Statistics.andersonDarlingTest(
+dataExp,
+norm,
+RParams(exp)._1,
+RParams(exp)._2
+  ).statistic
+  ~== ADTestExpNorm.statistic relTol 1e-4
+)
+assert(
+  Statistics.andersonDarlingTest(
+dataUnif,
+norm,
+RParams(unif)._1,
+RParams(unif)._2
+  ).statistic
+  ~== ADTestUnifNorm.statistic relTol 1e-4
+)
+
+/*
+  normality test in scipy: comparing critical values
+   from scipy.stats import anderson
+   # drop in values as arrays
+  ...
+ anderson(dataNorm, norm)
+(0.27523090925717852, array

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r34201231
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala ---
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no ties occur (the presence of ties can affect the 
validity of the test).
+ * The AD test provides an alternative to the Kolmogorov-Smirnov test. 
Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against.
+ * The  AD statistic is defined as -n - s/n, where
+ * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i})
+ * where z_i is the CDF value of the ith observation in the sorted sample.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object ADTest {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * ADTheoreticalDist is a trait that every distribution used in an AD 
test must extend.
+   * The rationale for this is that the AD test has distribution-dependent 
critical values, and by
+   * requiring extension of this trait we guarantee that future additional 
distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait ADTheoreticalDist {
+val params: Array[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Sourced from
+   * 
http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf
+   * 
https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017
+   */
+
+  // Exponential distribution
+  class ADExponential(val params: Array[Double]) extends ADTheoreticalDist 
{
+private val theoretical = new ExponentialDistribution(params(0))
+
+private val rawCVs = ListMap(
+  0.15 - 0.922, 0.10 - 1.078,
+  0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)}
+}
+  }
+
+  // Normal Distribution
+  class ADNormal(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new NormalDistribution(params(0), params(1))
+
+private val rawCVs = ListMap(
+  0.15 - 0.576, 0.10 - 0.656,
+  0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n 
* n)) }
+}
+  }
+
+  // Gumbel distribution
+  class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new GumbelDistribution(params(0), params(1

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r34201204
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/ADTest.scala ---
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson Darling (AD) test, similarly to the Kolmogorov Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no ties occur (the presence of ties can affect the 
validity of the test).
+ * The AD test provides an alternative to the Kolmogorov-Smirnov test. 
Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against.
+ * The  AD statistic is defined as -n - s/n, where
+ * s = sum from i=1 to n of (2i + 1)(ln(z_i) + ln(1 - z_{n+1-i})
+ * where z_i is the CDF value of the ith observation in the sorted sample.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object ADTest {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * ADTheoreticalDist is a trait that every distribution used in an AD 
test must extend.
+   * The rationale for this is that the AD test has distribution-dependent 
critical values, and by
+   * requiring extension of this trait we guarantee that future additional 
distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait ADTheoreticalDist {
+val params: Array[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Sourced from
+   * 
http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf
+   * 
https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017
+   */
+
+  // Exponential distribution
+  class ADExponential(val params: Array[Double]) extends ADTheoreticalDist 
{
+private val theoretical = new ExponentialDistribution(params(0))
+
+private val rawCVs = ListMap(
+  0.15 - 0.922, 0.10 - 1.078,
+  0.05 - 1.341, 0.025 - 1.606, 0.01 - 1.957
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 0.6 / n)}
+}
+  }
+
+  // Normal Distribution
+  class ADNormal(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new NormalDistribution(params(0), params(1))
+
+private val rawCVs = ListMap(
+  0.15 - 0.576, 0.10 - 0.656,
+  0.05 - 0.787, 0.025 - 0.918, 0.01 - 1.092
+)
+
+def cdf(x: Double): Double = theoretical.cumulativeProbability(x)
+
+def getCVs(n: Double): Map[Double, Double] = {
+  rawCVs.map { case (sig, cv) = sig - cv / (1 + 4.0 / n - 25.0 / (n 
* n)) }
+}
+  }
+
+  // Gumbel distribution
+  class ADGumbel(val params: Array[Double]) extends ADTheoreticalDist {
+private val theoretical = new GumbelDistribution(params(0), params(1

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214315
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
+ * hints used for executor allocation.
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
+  allocationManager.synchronized {
+  var localityAwarePendingTasks: Int = 0
+  val localityToCount = new mutable.HashMap[String, Int]()
+  stageIdToPreferredLocations.values.foreach { localities =
--- End diff --

Given that the values in stageIdToPreferredLocations don't change after the 
stage is submitted, it seems like we're going to be redoing a bunch of work 
each time we sync with the YarnAllocator.  Can we save intermediate information 
to avoid this?

Also, the way this works, we don't actually differentiate between pending, 
running, and completed tasks.  All tasks for a stage are counted as pending as 
long as the stage is still running.  How computationally expensive would it be 
to keep this up to date when tasks start and complete?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34213732
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -509,6 +514,10 @@ private[spark] class ExecutorAllocationManager(
 // Number of tasks currently running on the cluster.  Should be 0 when 
no stages are active.
 private var numRunningTasks: Int = _
 
+// stageId to preferred localities map, maintain the preferred node 
location of each task in
--- End diff --

location - locations


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34213924
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
--- End diff --

Change this to:
Get a tuple of (the total number of pending tasks with locality 
preferences, a map where each pair is a node and the number of pending tasks 
that would like to be scheduled on that node).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214220
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
+ * hints used for executor allocation.
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
+  allocationManager.synchronized {
+  var localityAwarePendingTasks: Int = 0
--- End diff --

no need for `: Int`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214716
  
--- Diff: 
core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 ---
@@ -66,6 +66,12 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   // Executors we have requested the cluster manager to kill that have not 
died yet
   private val executorsPendingToRemove = new HashSet[String]
 
+  // A map to store preferred locality with its required count
+  protected var preferredLocalityToCount: Map[String, Int] = Map.empty
+
+  // The number of pending tasks which is locality required
+  protected var localityAwarePendingTasks: Int = 0
--- End diff --

Don't need `: Int`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34215261
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCounts a map to store the preferred 
hostname and possible task
+   *  numbers running on it, used as hints 
for container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCounts: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214513
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =
+  partitionsToCompute.map { id = (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =
+val p: Int = job.partitions(id)
--- End diff --

No need for `: Int`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34215043
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCounts a map to store the preferred 
hostname and possible task
+   *  numbers running on it, used as hints 
for container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCounts: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34215363
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
+ * hints used for executor allocation.
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
--- End diff --

Do any tests exercise this code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214210
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -627,6 +641,29 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get the number of locality aware pending tasks and related locality 
preferences as the
+ * hints used for executor allocation.
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
+  allocationManager.synchronized {
--- End diff --

indent everything inside the synchronized block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214494
  
--- Diff: core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
---
@@ -872,6 +872,25 @@ class DAGScheduler(
 // will be posted, which should always come after a corresponding 
SparkListenerStageSubmitted
 // event.
 stage.latestInfo = StageInfo.fromStage(stage, 
Some(partitionsToCompute.size))
+val taskIdToLocations = try {
+  stage match {
+case s: ShuffleMapStage =
+  partitionsToCompute.map { id = (id, getPreferredLocs(stage.rdd, 
id))}.toMap
+case s: ResultStage =
+  val job = s.resultOfJob.get
+  partitionsToCompute.map { id =
+val p: Int = job.partitions(id)
+(id, getPreferredLocs(stage.rdd, p))
+  }.toMap
+  }
+} catch {
+  case NonFatal(e) =
--- End diff --

Why would we expect an exception to occur here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34214893
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCounts a map to store the preferred 
hostname and possible task
+   *  numbers running on it, used as hints 
for container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCounts: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r34215406
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
--- End diff --

Is this used for anything?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7263] Add new shuffle manager which sto...

2015-07-07 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7265#issuecomment-119344309
  
Is my understanding correct that, with this shuffle manager, we wouldn't be 
able to do reduce-side that sorts records, or any map-side spilling?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7263] Add new shuffle manager which sto...

2015-07-07 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7265#issuecomment-119350825
  
Having the Parquet shuffle reader follow that pattern seems preferable to 
me over failing when spilling would be required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-29 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33515526
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
--- End diff --

indent these two spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4069] [YARN] When AppMaster finishes, t...

2015-06-29 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/5233#issuecomment-116877465
  
There's no bug on the Spark side.  @PraveenSeluka is this still something 
you're running into?  If this is causing pain to a lot of users it could be 
worth the workaround, but otherwise I'd close this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-29 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6994#issuecomment-116876951
  
This LGTM.  Can you file a separate JIRA to add Python support?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8374] [YARN] Job frequently hangs after ...

2015-06-29 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7083#issuecomment-116882631
  
Hi @xuchenCN what makes you think this is the right fix?  We already 
decrement `numExecutorsRunning` when a container is killed, so I think doing it 
again would be double-counting.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-26 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-115565919
  
Hi @watermen, thanks for reporting this.  Does the error occur every time 
or just occasionally?  What Hadoop version are you running?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33394080
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+  searchOneSampleCandidates(partDiffs) // candidates: local extrema
+  }.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toLong)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param createDist `Unit = RealDistribution` function to create a 
theoretical distribution
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], createDist: () = 
RealDistribution): KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33394012
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+  searchOneSampleCandidates(partDiffs) // candidates: local extrema
+  }.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toLong)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param createDist `Unit = RealDistribution` function to create a 
theoretical distribution
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], createDist: () = 
RealDistribution): KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33393943
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+  searchOneSampleCandidates(partDiffs) // candidates: local extrema
+  }.collect()
--- End diff --

indent this back two spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33394039
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+  searchOneSampleCandidates(partDiffs) // candidates: local extrema
+  }.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toLong)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param createDist `Unit = RealDistribution` function to create a 
theoretical distribution
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], createDist: () = 
RealDistribution): KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33393914
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ *
+ * Implementation note: We seek to implement the KS test with a minimal 
number of distributed
+ * passes. We sort the RDD, and then perform the following operations on a 
per-partition basis:
+ * calculate an empirical cumulative distribution value for each 
observation, and a theoretical
+ * cumulative distribution value. We know the latter to be correct, while 
the former will be off by
+ * a constant (how large the constant is depends on how many values 
precede it in other partitions).
+ * However, given that this constant simply shifts the ECDF upwards, but 
doesn't change its shape,
+ * and furthermore, that constant is the same within a given partition, we 
can pick 2 values
+ * in each partition that can potentially resolve to the largest global 
distance. Namely, we
+ * pick the minimum distance and the maximum distance. Additionally, we 
keep track of how many
+ * elements are in each partition. Once these three values have been 
returned for every partition,
+ * we can collect and operate locally. Locally, we can now adjust each 
distance by the appropriate
+ * constant (the cumulative sum of # of elements in the prior partitions 
divided by the data set
+ * size). Finally, we take the maximum absolute value, and this is the 
statistic.
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val partDiffs = oneSampleDifferences(part, n, cdf) // local distances
+  searchOneSampleCandidates(partDiffs) // candidates: local extrema
+  }.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toLong)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param createDist `Unit = RealDistribution` function to create a 
theoretical distribution
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], createDist: () = 
RealDistribution): KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions { part =
+  val

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-26 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6994#issuecomment-115876109
  
This looks great, just had a few more minor style comments.  Can you also 
add some documentation here: 
https://spark.apache.org/docs/latest/mllib-statistics.html#hypothesis-testing?  
The source for this document is in docs/mllib-statistics.md.

@jkbradley @mengxr any comments before I merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8623. Hadoop RDDs fail to properly seria...

2015-06-26 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/7050

SPARK-8623. Hadoop RDDs fail to properly serialize configuration



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-8623

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/7050.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #7050


commit 58a80799ee953d19d3bf05906bcb00eb57da4c2c
Author: Sandy Ryza sa...@cloudera.com
Date:   2015-06-26T22:19:51Z

SPARK-8623. Hadoop RDDs fail to properly serialize configuration




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8623. Hadoop RDDs fail to properly seria...

2015-06-26 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7050#issuecomment-115910651
  
jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33315886
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCount a map to store the preferred host 
name and its required count
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover 
the requested localities.
+ * For example if we have 5 containers on each

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33316253
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCount a map to store the preferred host 
name and its required count
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover 
the requested localities.
+ * For example if we have 5 containers on each

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33315534
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient {
* This can result in canceling pending requests or filing additional 
requests.
* @return whether the request is acknowledged by the cluster manager.
*/
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+  numExecutors: Int,
--- End diff --

Document the new parameters


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317386
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
+  import yarnAllocatorSuite._
+
+  override def beforeEach() {
+yarnAllocatorSuite.beforeEach()
+  }
+
+  override def afterEach() {
+yarnAllocatorSuite.afterEach()
+  }
+
+  test(allocate locality preferred containers with enough resource and no 
matched existed  +
+containers) {
+// 1. All the locations of current containers cannot satisfy the new 
requirements
+// 2. Current requested container number can fully satisfy the pending 
tasks.
+
+val handler = createAllocator(2)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host2)))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(3, 
15,
+Map(host3 - 15, host4 - 15, host5 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  Array(host3, host4, host5),
+  Array(host3, host4, host5),
+  Array(host3, host4)))
+  }
+
+  test(allocate locality preferred containers with enough resource and 
partially matched  +
+containers) {
+// 1. Parts of current containers' locations can satisfy the new 
requirements
+// 2. Current requested container number can fully satisfy the pending 
tasks.
+
+val handler = createAllocator(3)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2)
+))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(3, 
15,
+Map(host1 - 15, host2 - 15, host3 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  null, // requested requested container with no locality preference
+  Array(host2, host3),
+  Array(host2, host3)))
+  }
+
+  test(allocate locality preferred containers with limited resource and 
partially matched  +
+containers) {
+// 1. Parts of current containers' locations can satisfy the new 
requirements
+// 2. Current requested container number cannot fully satisfy the 
pending tasks.
+
+val handler = createAllocator(3)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2)
+))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(1, 
15,
+Map(host1 - 15, host2 - 15, host3 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  /** newly requested locality preferred containers */
+  Array(host2, host3)))
+  }
+
+  test(allocate locality preferred containers with fully matched 
containers) {
+// Current containers' locations can fully satisfy the new requirements
+
+val handler = createAllocator(5)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2),
+  createContainer(host2),
+  createContainer(host3)
+))
+
+val nodeLocalities

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317505
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
+  import yarnAllocatorSuite._
+
+  override def beforeEach() {
+yarnAllocatorSuite.beforeEach()
+  }
+
+  override def afterEach() {
+yarnAllocatorSuite.afterEach()
+  }
+
+  test(allocate locality preferred containers with enough resource and no 
matched existed  +
+containers) {
+// 1. All the locations of current containers cannot satisfy the new 
requirements
+// 2. Current requested container number can fully satisfy the pending 
tasks.
+
+val handler = createAllocator(2)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
--- End diff --

Can fit on a single line?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317634
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCount a map to store the preferred host 
name and its required count
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover 
the requested localities.
+ * For example if we have 5 containers on each

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317476
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -149,7 +159,13 @@ private[yarn] class YarnAllocator(
*
* @return Whether the new requested total is different than the old 
value.
*/
-  def requestTotalExecutors(requestedTotal: Int): Boolean = synchronized {
+  def requestTotalExecutorsWithPreferredLocalities(
--- End diff --

Document the new arguments.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321039
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions {
+  part =
--- End diff --

put part on the line above and indent the rest back two spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321013
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -158,4 +158,25 @@ object Statistics {
   def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
 ChiSqTest.chiSquaredFeatures(data)
   }
+
+  /**
+   * Conduct a one-sample, two sided Kolmogorov Smirnov test for 
probability distribution equality
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param cdf a `Double = Double` function to calculate the theoretical 
CDF at a given value
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = {
+KSTest.testOneSample(data, cdf)
+  }
+
+  /**
+   * A convenience method to conduct a one-sample, two sided Kolmogorov 
Smirnov test for probability
+   * distribution equality
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param name a `String` name for a theoretical distribution
--- End diff --

Mention what distributions are supported


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321093
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], cdf: Double = Double): 
KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions {
+  part =
+val partDiffs = oneSampleDifferences(part, n, cdf) // local 
distances
+searchOneSampleCandidates(partDiffs) // candidates: local extrema
+}.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toInt)
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param data `RDD[Double]` data on which to run test
+   * @param createDist `Unit = RealDistribution` function to create a 
theoretical distribution
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(data: RDD[Double], createDist: () = 
RealDistribution): KSTestResult = {
+val n = data.count().toDouble
+val localData = data.sortBy(x = x).mapPartitions {
+  part =
+val partDiffs = oneSampleDifferences(part, n, createDist) // local 
distances
+searchOneSampleCandidates(partDiffs) // candidates: local extrema
+}.collect()
+val ksStat = searchOneSampleStatistic(localData, n) // result: global 
extreme
+evalOneSampleP(ksStat, n.toInt)
+  }
+
+  /**
+   * Calculate unadjusted distances between the empirical CDF and the 
theoretical CDF in a
+   * partition
+   * @param partData `Iterator[Double]` 1 partition of a sorted RDD
+   * @param n `Double` the total size of the RDD
+   * @param cdf `Double = Double` a function the calculates the 
theoretical CDF of a value
+   * @return `Iterator[Double] `Unadjusted (ie. off by a constant) 
differences between
+   *ECDF (empirical cumulative distribution function) and CDF. We 
subtract in such a way
+   *that when adjusted by the appropriate constant, the difference 
will be equivalent
+   *to the KS statistic calculation described in
+   *http://www.itl.nist.gov/div898/handbook/eda/section3/eda35g.htm
+   *where the difference is not exactly symmetric
+   */
+  private def oneSampleDifferences(partData: Iterator[Double], n: Double, 
cdf: Double = Double

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321019
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
--- End diff --

extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321567
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.{NormalDistribution, 
RealDistribution}
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
--- End diff --

Can you include a note on how this is implemented?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33316936
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCount a map to store the preferred host 
name and its required count
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover 
the requested localities.
+ * For example if we have 5 containers on each

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317230
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param preferredLocalityToCount a map to store the preferred host 
name and its required count
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover 
the requested localities.
+ * For example if we have 5 containers on each

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33317350
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
+  import yarnAllocatorSuite._
+
+  override def beforeEach() {
+yarnAllocatorSuite.beforeEach()
+  }
+
+  override def afterEach() {
+yarnAllocatorSuite.afterEach()
+  }
+
+  test(allocate locality preferred containers with enough resource and no 
matched existed  +
+containers) {
+// 1. All the locations of current containers cannot satisfy the new 
requirements
+// 2. Current requested container number can fully satisfy the pending 
tasks.
+
+val handler = createAllocator(2)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host2)))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(3, 
15,
+Map(host3 - 15, host4 - 15, host5 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  Array(host3, host4, host5),
+  Array(host3, host4, host5),
+  Array(host3, host4)))
+  }
+
+  test(allocate locality preferred containers with enough resource and 
partially matched  +
+containers) {
+// 1. Parts of current containers' locations can satisfy the new 
requirements
+// 2. Current requested container number can fully satisfy the pending 
tasks.
+
+val handler = createAllocator(3)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2)
+))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(3, 
15,
+Map(host1 - 15, host2 - 15, host3 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  null, // requested requested container with no locality preference
+  Array(host2, host3),
+  Array(host2, host3)))
+  }
+
+  test(allocate locality preferred containers with limited resource and 
partially matched  +
+containers) {
+// 1. Parts of current containers' locations can satisfy the new 
requirements
+// 2. Current requested container number cannot fully satisfy the 
pending tasks.
+
+val handler = createAllocator(3)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2)
+))
+
+val nodeLocalities =
+  handler.containerPlacementStrategy.localityOfRequestedContainers(1, 
15,
+Map(host1 - 15, host2 - 15, host3 - 10))
+  .map(_.nodes)
+
+assert(nodeLocalities === Array(
+  /** newly requested locality preferred containers */
+  Array(host2, host3)))
+  }
+
+  test(allocate locality preferred containers with fully matched 
containers) {
+// Current containers' locations can fully satisfy the new requirements
+
+val handler = createAllocator(5)
+handler.updateResourceRequests()
+handler.handleAllocatedContainers(Array(
+  createContainer(host1),
+  createContainer(host1),
+  createContainer(host2),
+  createContainer(host2),
+  createContainer(host3)
+))
+
+val nodeLocalities

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-25 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33321131
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/TestResult.scala ---
@@ -90,3 +90,18 @@ class ChiSqTestResult private[stat] (override val 
pValue: Double,
   super.toString
   }
 }
+
+/**
+ * :: Experimental ::
+ * Object containing the test results for the Kolmogorov-Smirnov test.
+ */
+@Experimental
+class KSTestResult private[stat] (override val pValue: Double,
+  override val statistic: Double,
--- End diff --

Indent these back to be consistent with the `ChiSqTestResult` definition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33194722
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala ---
@@ -25,6 +25,7 @@ import org.apache.hadoop.net.DNSToSwitchMapping
 import org.apache.hadoop.yarn.api.records._
 import org.apache.hadoop.yarn.client.api.AMRMClient
 import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
+import org.scalatest.{BeforeAndAfterEach, Matchers}
--- End diff --

Any reason for moving these imports?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33195618
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] trait ContainerPlacementStrategy {
+  type Locality = Array[String]
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(numContainer: Int, 
numLocalityAwarePendingTasks: Int,
--- End diff --

When arguments overflow a single line, each gets its own line.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33195574
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] trait ContainerPlacementStrategy {
+  type Locality = Array[String]
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(numContainer: Int, 
numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]): (Array[Locality], 
Array[Locality])
+}
+
+/**
+ * This strategy calculates the preferred localities by considering the 
node ratio of pending
--- End diff --

Can we mention here what this strategy is trying to optimize?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33191664
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -158,4 +158,44 @@ object Statistics {
   def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
 ChiSqTest.chiSquaredFeatures(data)
   }
+
+  /**
+   * Conduct a one-sample, two sided Kolmogorov Smirnov test for 
probability distribution equality
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param cdf a `Double = Double` function to calculate the theoretical 
CDF at a given value
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = {
+KSTest.testOneSample(data, cdf)
+  }
+
+  /**
+   * Conduct a one-sample, two sided Kolmogorov Smirnov test for 
probability distribution equality,
+   * which creates only 1 distribution object per partition (useful in 
conjunction with Apache
+   * Commons Math distributions)
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param distCalc a `Iterator[(Double, Double, Double)] = 
Iterator[Double]` function, to
+   * calculate the distance between empirical values and 
theoretical values of
+   * a distribution. The first element corresponds to the 
value x, the second
+   * element is the lower bound of the empirical CDF, 
while the third element is
+   * the upper bound. Thus if we call triple associated 
with an observation T, the
+   * KS distance at that point is max(Pr[X = T._1] - 
T._2, T._3 - Pr[X = T._1])
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTestOpt(data: RDD[Double],
--- End diff --

I think I'd leave this API out on the first pass, as, while there are 
definitely situations where it's useful, it's likely to be confusing to users.  
We can always add it in later if there's demand.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33195453
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] trait ContainerPlacementStrategy {
+  type Locality = Array[String]
--- End diff --

If I understand correctly, a pair of localities is used for a single 
ContainerRequest.

If this is the case, it might be most clear to remove the locality type, 
and replace it with a ContainerLocalityPreferences class that has two fields: 
`nodes: Array[String]` and `racks: Array[String]`.  Then 
`localityOfRequestedContainers` could just return an Array of these.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r33196367
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] trait ContainerPlacementStrategy {
+  type Locality = Array[String]
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   *
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(numContainer: Int, 
numLocalityAwarePendingTasks: Int,
+  preferredLocalityToCount: Map[String, Int]): (Array[Locality], 
Array[Locality])
+}
+
+/**
+ * This strategy calculates the preferred localities by considering the 
node ratio of pending
--- End diff --

Which I think is something like:
If YARN were to give us containers that satisfied our locality preferences, 
and we were to fill them up with tasks until all tasks completed, we're trying 
to maximize the number of tasks that would run locally.

(this statement maybe needs to be revised to account for already-running 
containers and already-running tasks)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6994#issuecomment-115113205
  
jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33224064
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param data `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   */
+  def empirical(data: RDD[Double]): RDD[(Double, Double, Double)] = {
+val n = data.count().toDouble
+data.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i 
+ 1) / n) }
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param dat `RDD[Double]` to evaluate
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult 
= {
+val empiriRDD = empirical(dat) // empirical distribution
+val distances = empiriRDD.map {
+case (v, dl, dp) =
+  val cdfVal = cdf(v)
+  Math.max(cdfVal - dl, dp - cdfVal)
+  }
+val ksStat = distances.max()
+evalOneSampleP(ksStat, distances.count())
--- End diff --

Can we reuse the count that was computed in `empirical`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33223955
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param data `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   */
+  def empirical(data: RDD[Double]): RDD[(Double, Double, Double)] = {
+val n = data.count().toDouble
+data.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i 
+ 1) / n) }
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param dat `RDD[Double]` to evaluate
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult 
= {
--- End diff --

nit: data instead of dat


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8402][MLLIB] DP Means Clustering

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6880#discussion_r33174171
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/DpMeansModel.scala ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.pmml.PMMLExportable
+import org.apache.spark.mllib.util.{Loader, Saveable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SQLContext}
+
+/**
+ * A clustering model for DP means. Each point belongs to the cluster with 
the closest center.
+ */
+class DpMeansModel
--- End diff --

I don't know if there are plans, just thought it might be a good idea now 
that three's a crowd.  Probably best to wait for @mengxr or @jkbradley to weigh 
in before making changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33188946
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
--- End diff --

Alphabetize the imports in here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33188987
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+  private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param dat `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   *
+   */
+  def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = {
--- End diff --

Nit: replace dat with data


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189226
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+  private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param dat `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   *
--- End diff --

extra newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189172
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+  private[stat] object KSTest {
--- End diff --

indent this back two spaces


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189336
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+  private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param dat `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   *
+   */
+  def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = {
+val n = dat.count().toDouble
+dat.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 
1) / n) }
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param dat `RDD[Double]` to evaluate
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return a KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult 
= {
+val empiriRDD = empirical(dat) // empirical distribution
+val distances = empiriRDD.map {
+case (v, dl, dp) =
+  val cdfVal = cdf(v)
+  Math.max(cdfVal - dl, dp - cdfVal)
+  }
+val ksStat = distances.max()
+evalOneSampleP(ksStat, distances.count())
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution. Optimized
+   * such that each partition runs a separate mapping operation. This can 
help in cases where the
+   * CDF calculation involves creating an object. By using this 
implementation we can make sure
+   * only 1 object is created per partition, versus 1 per observation.
+   * @param dat `RDD[Double]` to evaluate
+   * @param distCalc a function to calculate the distance between the 
empirical values and the
+   * theoretical value
+   * @return a KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSampleOpt(dat: RDD[Double],
+   distCalc: Iterator[(Double, Double, Double)] = 
Iterator[Double])
--- End diff --

`distCalc` should be indented four spaces past `def` and `: KSTestResult` 
should be indented two spaces past `def`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct

[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189576
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -153,4 +157,61 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
   Statistics.chiSqTest(sc.parallelize(continuousFeature, 2))
 }
   }
+
+  test(kolmogorov smirnov test empirical distributions) {
+
--- End diff --

unnecessary newline


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189552
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -19,6 +19,10 @@ package org.apache.spark.mllib.stat
 
 import java.util.Random
 
+import org.apache.commons.math3.distribution.{ExponentialDistribution,
+NormalDistribution, UniformRealDistribution}
--- End diff --

Indent two spaces forward


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33189747
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -158,4 +158,44 @@ object Statistics {
   def chiSqTest(data: RDD[LabeledPoint]): Array[ChiSqTestResult] = {
 ChiSqTest.chiSquaredFeatures(data)
   }
+
+  /**
+   * Conduct a one-sample, two sided Kolmogorov Smirnov test for 
probability distribution equality
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param cdf a `Double = Double` function to calculate the theoretical 
CDF at a given value
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTest(data: RDD[Double], cdf: Double = Double): KSTestResult = {
+KSTest.testOneSample(data, cdf)
+  }
+
+  /**
+   * Conduct a one-sample, two sided Kolmogorov Smirnov test for 
probability distribution equality,
+   * which creates only 1 distribution object per partition (useful in 
conjunction with Apache
+   * Commons Math distributions)
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param distCalc a `Iterator[(Double, Double, Double)] = 
Iterator[Double]` function, to
+   * calculate the distance between empirical values and 
theoretical values of
+   * a distribution. The first element corresponds to the 
value x, the second
+   * element is the lower bound of the empirical CDF, 
while the third element is
+   * the upper bound. Thus if we call triple associated 
with an observation T, the
+   * KS distance at that point is max(Pr[X = T._1] - 
T._2, T._3 - Pr[X = T._1])
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTestOpt(data: RDD[Double],
+  distCalc: Iterator[(Double, Double, Double)] = Iterator[Double])
+  : KSTestResult = {
+KSTest.testOneSampleOpt(data, distCalc)
+  }
+
+  /**
+   * A convenience method to conduct a one-sample, two sided Kolmogorov 
Smirnov test for probability
+   * distribution equality
+   * @param data an `RDD[Double]` containing the sample of data to test
+   * @param name a `String` name for a theoretical distribution
+   * @return KSTestResult object containing test statistic, p-value, and 
null hypothesis.
+   */
+  def ksTest(data: RDD[Double], name: String): KSTestResult = {
--- End diff --

Put this up below the other `ksTest` method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8598] [MLlib] Implementation of 1-sampl...

2015-06-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6994#discussion_r33190190
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KSTest.scala ---
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import org.apache.commons.math3.distribution.NormalDistribution
+import org.apache.commons.math3.stat.inference.KolmogorovSmirnovTest
+
+import org.apache.spark.{SparkException, Logging}
+import org.apache.spark.rdd.RDD
+
+
+/**
+ * Conduct the two-sided Kolmogorov Smirnov test for data sampled from a
+ * continuous distribution. By comparing the largest difference between 
the empirical cumulative
+ * distribution of the sample data and the theoretical distribution we can 
provide a test for the
+ * the null hypothesis that the sample data comes from that theoretical 
distribution.
+ * For more information on KS Test: 
https://en.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test
+ */
+  private[stat] object KSTest {
+
+  // Null hypothesis for the type of KS test to be included in the result.
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val oneSampleTwoSided = Value(Sample follows theoretical 
distribution.)
+  }
+
+  /**
+   * Calculate empirical cumulative distribution values needed for KS 
statistic
+   * @param dat `RDD[Double]` on which to calculate empirical cumulative 
distribution values
+   * @return and RDD of (Double, Double, Double), where the first element 
in each tuple is the
+   * value, the second element is the ECDFV - 1 /n, and the third 
element is the ECDFV,
+   * where ECDF stands for empirical cumulative distribution 
function value
+   *
+   */
+  def empirical(dat: RDD[Double]): RDD[(Double, Double, Double)] = {
+val n = dat.count().toDouble
+dat.sortBy(x = x).zipWithIndex().map { case (v, i) = (v, i / n, (i + 
1) / n) }
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution
+   * @param dat `RDD[Double]` to evaluate
+   * @param cdf `Double = Double` function to calculate the theoretical 
CDF
+   * @return a KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSample(dat: RDD[Double], cdf: Double = Double): KSTestResult 
= {
+val empiriRDD = empirical(dat) // empirical distribution
+val distances = empiriRDD.map {
+case (v, dl, dp) =
+  val cdfVal = cdf(v)
+  Math.max(cdfVal - dl, dp - cdfVal)
+  }
+val ksStat = distances.max()
+evalOneSampleP(ksStat, distances.count())
+  }
+
+  /**
+   * Runs a KS test for 1 set of sample data, comparing it to a 
theoretical distribution. Optimized
+   * such that each partition runs a separate mapping operation. This can 
help in cases where the
+   * CDF calculation involves creating an object. By using this 
implementation we can make sure
+   * only 1 object is created per partition, versus 1 per observation.
+   * @param dat `RDD[Double]` to evaluate
+   * @param distCalc a function to calculate the distance between the 
empirical values and the
+   * theoretical value
+   * @return a KSTestResult summarizing the test results (pval, statistic, 
and null hypothesis)
+   */
+  def testOneSampleOpt(dat: RDD[Double],
+   distCalc: Iterator[(Double, Double, Double)] = 
Iterator[Double])
+  : KSTestResult = {
+val empiriRDD = empirical(dat) // empirical distribution information
+val distances = empiriRDD.mapPartitions(distCalc, false)
+val ksStat = distances.max
+evalOneSampleP(ksStat, distances.count())
+  }
+
+  /**
+   * Returns a function to calculate the KSTest with a standard normal 
distribution
+   * to be used with testOneSampleOpt

[GitHub] spark pull request: [SPARK-8402][MLLIB] DP Means Clustering

2015-06-18 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6880#discussion_r32793919
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/clustering/DpMeansModel.scala ---
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.clustering
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable
+
+import org.json4s.DefaultFormats
+import org.json4s.JsonDSL._
+import org.json4s.jackson.JsonMethods._
+
+import org.apache.spark.mllib.linalg.Vector
+import org.apache.spark.mllib.pmml.PMMLExportable
+import org.apache.spark.mllib.util.{Loader, Saveable}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{Row, SQLContext}
+
+/**
+ * A clustering model for DP means. Each point belongs to the cluster with 
the closest center.
+ */
+class DpMeansModel
--- End diff --

Is there any difference between this and a `KMeansModel`?  Might we be able 
to consolidate them into something like a `ClusterCentersModel`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-18 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r32776975
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] trait ContainerPlacementStrategy {
+  type Locality = Array[String]
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(numContainer: Int): (Array[Locality], 
Array[Locality])
--- End diff --

Can we put all the information that's used to determine the locality 
requests as arguments to this function?  It will make it clearer for later 
readers to understand what's used in this calculation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-113322897
  
@JoshRosen this should be ready for merge


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-17 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-112981135
  
Hi @jerryshao is this ready for review?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-10 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-110994319
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-110566851
  
@shivaram @kayousterhout this approach addresses my concerns.  Thanks for 
updating!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-09 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-110571366
  
I definitely don't think we rely on it in Spark.  On Cloudera setups, as 
well as presumably Hortonworks and MapR setups, client configurations are 
synchronized globally across nodes, so this discrepancy couldn't occur.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r31962024
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -438,6 +519,15 @@ private[yarn] class YarnAllocator(
 amClient.releaseAssignedContainer(container.getId())
   }
 
+  private def rackLookUp(host: String, conf: Configuration): String = {
+hostToRackNameCache.get(host) match {
--- End diff --

Have you noticed a need for this caching?  RackResolver should be doing its 
own caching, so this shoudn't be needed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r31973076
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,12 +243,74 @@ private[yarn] class YarnAllocator(
   logInfo(sWill request $missing executor containers, each with 
${resource.getVirtualCores}  +
 scores and ${resource.getMemory} MB memory including 
$memoryOverhead MB overhead)
 
-  for (i - 0 until missing) {
-val request = createContainerRequest(resource)
-amClient.addContainerRequest(request)
-val nodes = request.getNodes
-val hostStr = if (nodes == null || nodes.isEmpty) Any else 
nodes.last
-logInfo(sContainer request (host: $hostStr, capability: 
$resource))
+  // Calculated the number of executors we expected to satisfy all the 
preferred locality tasks
--- End diff --

One possible issue with the approach here is that our locality preferences 
might shift when a stage completes and a new stage comes in.  If we have a set 
of outstanding requests, it would be nice if we could cancel requests for the 
locations that we no longer need and place our requests on nodes that we now 
care about.

One way we could achieve this is, each time we call `updateResourceRequsts` 
remove all the unsatisfied container requests from amClient and resubmit 
requests based on our current needs.  I.e., after removal, missing would always 
be `targetNumExecutors - numExecutorsRunning`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-06-08 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r31972804
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,12 +243,74 @@ private[yarn] class YarnAllocator(
   logInfo(sWill request $missing executor containers, each with 
${resource.getVirtualCores}  +
 scores and ${resource.getMemory} MB memory including 
$memoryOverhead MB overhead)
 
-  for (i - 0 until missing) {
-val request = createContainerRequest(resource)
-amClient.addContainerRequest(request)
-val nodes = request.getNodes
-val hostStr = if (nodes == null || nodes.isEmpty) Any else 
nodes.last
-logInfo(sContainer request (host: $hostStr, capability: 
$resource))
+  // Calculated the number of executors we expected to satisfy all the 
preferred locality tasks
+  val localityAwareTaskCores = localityAwarePendingTaskNum * 
CPUS_PER_TASK
+  val expectedLocalityAwareContainerNum =
+(localityAwareTaskCores + resource.getVirtualCores - 1) / 
resource.getVirtualCores
+
+  // Get the all the existed and locality matched containers
+  val existedMatchedContainers = allocatedHostToContainersMap.filter { 
case (host, _) =
+preferredLocalityToCounts.contains(host)
+  }
+  val existedMatchedContainerNum = 
existedMatchedContainers.values.map(_.size).sum
+
+  // The number of containers to allocate, divided into two groups, 
one with node locality,
+  // and the other without locality preference.
+  var requiredLocalityFreeContainerNum: Int = 0
+  var requiredLocalityAwareContainerNum: Int = 0
+
+  if (expectedLocalityAwareContainerNum = existedMatchedContainerNum) 
{
+// If the current allocated executor can satisfy all the locality 
preferred tasks,
--- End diff --

This is a little weird to me.  IIUC, what we're saying here is:
* Find all the containers from all the nodes that have at least one task 
that would be happy to be there.
* If, taken together, these containers have enough capacity to run all 
pending tasks with locality preferences, none of the executor requests we 
submit need to have locality preferences.

This would result in sub-optimal behavior in the following situation:
* We have 48 tasks that have locality preferences distributed across a wide 
number of nodes, including one task that can run on either node 1 or node 2.
* Node 1 and node 2 each have 6 executors with 4 cores each.
In this situation, we'd end up giving up on locality-based requests, even 
though it would make sense to request executors on some of the nodes that the 
tasks want to be on.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-08 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-109890835
  
Ah, yeah, that was the change I was referring to.

I'm not sure about the Mesos deployment model, but on Standalone mode at 
least, it would be possible to manufacture a situation where the behavior 
changes.  That situation is the Example 2 outlined above. This difference 
relies on config files with different client configs being distributed to the 
client and to the cluster nodes.  I would argue that changing behavior in this 
way is OK basically because the current behavior is wrong.  That is, client 
configs should come from the client and not from the cluster.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8099] set executor cores into system in...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6643#issuecomment-109383319
  
LGTM, merging


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-109397104
  
When there is no skew, are there situations where this would lead to worse 
performance?  E.g. will it make tasks bunch up on nodes more than before and / 
or result in scheduling delays?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-109398051
  
Also, is there an intuitive justification for why 5 is a good number?  It 
seems a little weird to me that it's independent of the number of tasks, the 
cluster size, and the amount of skew.  Have you considered a rule like if X% 
of the data falls on Y% of the nodes, mark those as preferred, where X and Y 
could be 80/20 or 70/30?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-7699][Core] Lazy start the scheduler fo...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6430#issuecomment-109395777
  
This looks right to me.  Merging.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-109450997
  
Ahhh, I misunderstood and thought that all the reduce tasks for a stage got 
the same locality preferences. I withdraw my concern.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-109445990
  
On the other side, I can also envision situations where it would be really 
helpful to request a higher number of preferred locations.  Consider a large 
YARN cluster where an app gets executors on a small subset of the nodes.   The 
app is idle for a while after running an initial job, so the initial set of 
executors go away.  The app wakes up later and wants to use the output of its 
first stage for another job.  If the scheduler could request tasks on the nodes 
housing the outputs of the first stage, with SPARK-4352, the 
ExecutorAllocationManager would be able to request executors on these nodes.

This probably makes sense as later work, but still thought it could be 
worth bringing up as part of the discussion.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-2774] Set preferred locations for reduc...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6652#issuecomment-109444936
  
My worry is the following situation:
* Map outputs from stage 1 exhibit little skew and are distributed evenly 
across nodes.
* 5 nodes are chosen as preferred for stage 2.  The choice is basically 
random because of the lack of skew.
* Stage 2 has fewer tasks than the number of slots.
* Instead of stage 2's tasks evenly distributed across the cluster, all of 
them end up on the 5 nodes, which are more likely to become bottlenecked on 
incoming network traffic.

This isn't a disaster situation by any means.  But it also seems like it 
could be easily avoided with a policy that only prefers locations when it would 
make a difference.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-109493444
  
Where do we end up cloning Configuration objects?  With these changes, we 
avoid loading defaults when we reconstitute Configuration objects from bytes.  
Are there hot paths where we create Configuration objects from other live 
Configuration objects?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. In SerializableWritable, don't loa...

2015-06-05 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/6679

SPARK-8135. In SerializableWritable, don't load defaults when instant…

…iating Configuration

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-8135

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/6679.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6679


commit ca6554357e951807bb9bbb8c2a647b74d6954d68
Author: Sandy Ryza sa...@cloudera.com
Date:   2015-06-05T23:28:45Z

SPARK-8135. In SerializableWritable, don't load defaults when instantiating 
Configuration




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8136][YARN] Fix flakiness in YarnCluste...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6680#issuecomment-109502268
  
Does this comment still apply: `// If we are running in yarn-cluster mode, 
verify that driver logs are downloadable.`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-109502593
  
Makes sense.  In that case, this should be ready for review.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-109499883
  
In light of this change, do you think we should remove the broadcasting of 
Configurations?  While we avoid the much larger cost of reading and parsing XML 
for each task, we would still pay the cost of turning bytes into Configuration 
objects.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8135. Don't load defaults when reconstit...

2015-06-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6679#issuecomment-109499640
  
There's a situation in which there could be a behavior change in situations 
where the executor somehow has a different Hadoop configuration file than the 
driver.  But I think it's the right change.  I started to explain this stuff 
abstractly, but I think it might be easier to just put down some examples:

**Example 1**
core-site.xml on the driver contains optionA-value1
core-site.xml on the executor contains optionA-value2
Old behavior: on the executor, conf.get(optionA) returns value1
New behavior: same as old behavior

**Example 2**
core-site.xml on the driver does not contain optionA
core-site.xml on the executor contains optionA-value1
Old behavior: on the executor, conf.get(optionA) returns value1
New behavior: on the executor, conf.get(optionA) returns null

I can't find the JIRA, but I believe there was a recent change by @vanzin 
that made it so that the executor would use a copy of the Hadoop configuration 
files used on the driver.  When that is the case, neither example 1 nor example 
2 can occur.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



<    1   2   3   4   5   6   7   8   9   10   >