[GitHub] spark pull request: [SPARK-7173][YARN] Add label expression suppor...

2015-11-18 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9800#discussion_r45254024
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -225,7 +225,32 @@ private[spark] class Client(
 val capability = Records.newRecord(classOf[Resource])
 capability.setMemory(args.amMemory + amMemoryOverhead)
 capability.setVirtualCores(args.amCores)
-appContext.setResource(capability)
+
+if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) {
+  try {
+val amRequest = Records.newRecord(classOf[ResourceRequest])
+amRequest.setResourceName(ResourceRequest.ANY)
+amRequest.setPriority(Priority.newInstance(0))
+amRequest.setCapability(capability)
+amRequest.setNumContainers(1)
+amRequest.setRelaxLocality(true)
--- End diff --

Is this required?  It should be the default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-11410] [PYSPARK] Add python bindings fo...

2015-11-05 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/9504#issuecomment-154201525
  
Title should have the [SQL] tag as well


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...

2015-11-04 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9313#discussion_r43909799
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1611,8 +1611,14 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
* Adds a JAR dependency for all tasks to be executed on this 
SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or 
other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on 
every worker node.
+   * If addToCurrentThread is true, attempt to add the new class to the 
current threads class
+   * loader.
*/
   def addJar(path: String) {
+addJar(path, false)
+  }
+
+  def addJar(path: String, addToCurrentThread: Boolean) {
--- End diff --

In that case, maybe good to call this out explicitly in the doc?  Also, the 
naming `addToCurrentThread` makes it seem like it'll get added *only* to the 
current thread, so maybe something like `addToContextClassLoader` would be more 
clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...

2015-11-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9313#discussion_r43726053
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1611,8 +1611,14 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
* Adds a JAR dependency for all tasks to be executed on this 
SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or 
other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on 
every worker node.
+   * If addToCurrentThread is true, attempt to add the new class to the 
current threads class
+   * loader.
*/
   def addJar(path: String) {
+addJar(path, false)
+  }
+
+  def addJar(path: String, addToCurrentThread: Boolean) {
--- End diff --

Would it be correct to say that in nearly all cases, setting the second 
argument to true will result in the jar being added to all of the application's 
threads?  Because Spark sets the context classloader to a MutableClassLoader 
before loading the application's main class, and then all other app threads 
will inherit this as the default unless they explicitly change the context 
class loader?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...

2015-11-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9313#discussion_r43726148
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -1667,6 +1673,13 @@ class SparkContext(config: SparkConf) extends 
Logging with ExecutorAllocationCli
   if (key != null) {
 addedJars(key) = System.currentTimeMillis
 logInfo("Added JAR " + path + " at " + key + " with timestamp " + 
addedJars(key))
+if (addToCurrentThread) {
+  val currentCL = Utils.getContextOrSparkClassLoader
+  currentCL match {
+case cl: MutableURLClassLoader =>  cl.addURL(new 
URI(key).toURL())
+case _ => logWarning(s"Unsupported cl $currentCL will not 
update jars for current thread")
--- End diff --

My opinion is that it's better to throw an exception here.  Though as 
that's an API behavior question, probably best to defer to a core maintainer?  
@pwendell any thoughts?  And thoughts on adding this option to `addJar` in 
general?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...

2015-11-02 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9313#discussion_r43682628
  
--- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala ---
@@ -175,6 +175,29 @@ private[spark] object Utils extends Logging {
   }
 
   /**
+   * Update the current threads class loader.
+   * Requires the current class loader is a MutableURLClassLoader, 
otherwise skips updating with a
+   * warning. Intended for use by addJar(), although constructing an 
instance of the class will
+   * still require:
+   * 
sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("class
 name")
+   * as described in SPARK-5185.
+   */
+  def updatePrimaryClassLoader(sc: SparkContext) {
--- End diff --

What's the advantage of this update method that compares the full contents 
of `sc.addedJars` to those already registered with the current classloader, vs. 
just calling `cs.addUrl` directly on the jar being added?

A possibly confusing effect of this approach is that if I have code like:
sc.addJar(jar1)
... other stuff blah blah blah ...
   sc.addJar(jar2, true)

jar1 will get added to the classpath only when `sc.addJar` is called the 
second time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4921. TaskSetManager.dequeueTask returns...

2015-11-02 Thread sryza
GitHub user sryza reopened a pull request:

https://github.com/apache/spark/pull/3816

SPARK-4921. TaskSetManager.dequeueTask returns PROCESS_LOCAL for NO_PREF ...

...tasks

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-4921

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/3816.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #3816


commit 247ce5587b8a06fe586a2730f1ad2df4ab7f79dc
Author: Sandy Ryza <sa...@cloudera.com>
Date:   2014-12-28T03:55:47Z

PARK-4921. TaskSetManager.dequeueTask returns PROCESS_LOCAL for NO_PREF 
tasks




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-4921. TaskSetManager.dequeueTask returns...

2015-11-02 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/3816#issuecomment-153165000
  
@zsxwing I have sadly lost most of the context on this issue and don't have 
time to pick it back up at the moment.

What you point out does seem to be an issue, though.  Definitely feel free 
to take my patch if you think it's the right fix.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...

2015-10-29 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/9313#issuecomment-152234819
  
Could we add that flag to the Scala API as well?  Would that break binary 
compatibility?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...

2015-10-28 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/9313#issuecomment-152069509
  
Does the Scala SparkContext#addJar add the jar to the driver classpath?  My 
impression was that it does not.  If so, this would be a little inconsistent, 
right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...

2015-10-28 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/9313#discussion_r43347726
  
--- Diff: python/pyspark/context.py ---
@@ -806,6 +806,24 @@ def addPyFile(self, path):
 import importlib
 importlib.invalidate_caches()
 
+def addJar(self, path):
+"""
+Adds a JAR dependency for all tasks to be executed on this 
SparkContext in the future.
+The `path` passed can be either a local file, a file in HDFS (or 
other Hadoop-supported
+filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file 
on every worker node.
+"""
+self._jsc.sc().addJar(path)
+self._jvm.PythonUtils.updatePrimaryClassLoader(self._jsc)
+
+def _loadClass(self, className):
+"""
+.. note:: Experimental
+
+Loads a JVM class using the MutableClass loader used by spark.
+This function exists because Py4J uses a different class loader.
+"""
+
self._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(className)
--- End diff --

In Utils.getContextOrSparkClassLoader, it seems like the context 
classloader can sometimes be null.  Is that possible here as well?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10921] [YARN] Completely remove the use...

2015-10-06 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8976#issuecomment-145976915
  
Taking these out seems like the right thing to do to me


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...

2015-09-22 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7918#issuecomment-142201185
  
Ah, thanks for digging that up @rxin.  In light of that, I definitely don't 
see a compelling reason to keep the input format caching around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...

2015-09-15 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7918#issuecomment-140575553
  
Ah, yeah, it does look like ReflectionUtils.newInstance caches the 
constructor.  In that case I'd be OK with removing the input format cache 
entirely.  Any thoughts @JoshRosen ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-10611 Clone Configuration for each task ...

2015-09-15 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8763#issuecomment-140519129
  
Oh, nevermind, sorry, this is off by default.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-10611 Clone Configuration for each task ...

2015-09-15 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8763#issuecomment-140518870
  
This seems like it could have a pretty serious perf impact.  Are we able to 
do some benchmarking to assess this? 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-10288][Yarn] Add REST client for Spark ...

2015-09-08 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8504#issuecomment-138678684
  
My reaction is similar to Andrew's.  I haven't across much user pain around 
the way that Spark submits apps to YARN.  If particular use cases where the 
current way of doing things causes problems, I'd be willing to reconsider.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...

2015-09-08 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7918#issuecomment-138687768
  
Sorry for the delay here, have been on PTO.

IIUC, the change here makes Spark work with some exotic InputFormats that 
it previously did not work with due to thread safety, at the possible expense 
of performance.  Users can revert to the old behavior with a config.

There's no associated JIRA, but a6eeb5ffd54956667ec4e793149fdab90041ad6c is 
the hash of the change that appears to have introduced the input format cache.  
Unfortunately, I don't see any performance numbers there justifying its 
addition.

It seems like the main overhead we're trying to avoid is the reflective 
calls.  What about caching the constructor so that we don't need to look it up 
for each task?

Lastly, is an equivalent change needed for NewHadoopRDD?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9969][YARN] Remove old MR classpath API...

2015-08-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8192#issuecomment-132380275
  
Yeah, it's needed for the InputFormats that Spark relies on to read Hadoop 
data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8072#issuecomment-132214961
  
@tgravescs my thinking for just spark.yarn.tags was that it's redundant to 
include application, because configs are by definition per-application.  We'd 
also be consistent with spark.yarn.queue and spark.yarn.jars.  If we do include 
application, I think it should be spark.yarn.application-tags, because then 
we're not adding a new application namespace.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-18 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8072#issuecomment-132218054
  
Cool, in that case @dennishuo mind making the change that @vanzin suggested 
and then I'll merge this?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-08-17 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7075#issuecomment-131819801
  
@mengxr is it too late to get this in to 1.5?

@josepablocam are you able to resolve merge conflicts?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-7707. User guide and example code for Ke...

2015-08-16 Thread sryza
GitHub user sryza opened a pull request:

https://github.com/apache/spark/pull/8230

SPARK-7707. User guide and example code for KernelDensity



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sryza/spark sandy-spark-7707

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/8230.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #8230


commit 5120a645d11378d4c7a2ae634082b46469810632
Author: Sandy Ryza sa...@cloudera.com
Date:   2015-08-16T12:37:03Z

SPARK-7707. User guide and example code for KernelDensity




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9570][Docs][YARN]Consistent recommendat...

2015-08-16 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8071#issuecomment-131547053
  
Sorry all for the delay, have been in Indonesia for the week.

I'm also in favor of the standard way being --deploy-mode.  This keeps 
things consistent with standalone and Mesos.  I think it's probably fine if in 
some places in the docs we use yarn-client as shorthand to refer to running 
on YARN with the client deploy mode, but in the actual instructions on how to 
launch, my personal preference would be `--master yarn --deploy-mode 
client/cluster`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-7707. User guide and example code for Ke...

2015-08-16 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8230#issuecomment-131547427
  
retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-16 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8072#issuecomment-131679667
  
@tgravescs @vanzin @andrewor14 can I get one of you to sign off on my 
proposed property name `spark.yarn.tags`?

Otherwise, this LGTM


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-12 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/8072#issuecomment-130196650
  
jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-12 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36833771
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType(SPARK)
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =
+try {
+  // The setApplicationTags method was only introduced in Hadoop 
2.4+, so we need to use
+  // reflection to set it, printing a warning if a tag was 
specified but the YARN version
+  // doesn't support it.
+  val method = appContext.getClass().getMethod(
+setApplicationTags, classOf[java.util.Set[String]])
+  method.invoke(appContext, new 
java.util.HashSet[String](tagCollection))
+} catch {
+  case e: NoSuchMethodException =
+logWarning(Ignoring %s='%s' because this version of YARN does 
not support it
--- End diff --

Nit: use `sIgnoring $CONF_SPARK_YARN_APPLICATION_TAGS because`...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36708697
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType(SPARK)
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =
+try {
+  val method = appContext.getClass().getMethod(
+setApplicationTags, classOf[java.util.Set[String]])
+  method.invoke(appContext, new 
java.util.HashSet[String](tagCollection))
+  logInfo(Applied setApplicationTags based on %s='%s'
+.format(CONF_SPARK_YARN_APPLICATION_TAGS, tagCollection))
+
--- End diff --

Extra space unnecessary


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36708684
  
--- Diff: docs/running-on-yarn.md ---
@@ -320,6 +320,14 @@ If you need a reference to the proper location to put 
log files in the YARN so t
   /td
 /tr
 tr
+  tdcodespark.yarn.application.tags/code/td
--- End diff --

I would change this to simply spark.yarn.tags


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36708654
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType(SPARK)
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =
+try {
+  val method = appContext.getClass().getMethod(
--- End diff --

Add a comment here mentioning why this needs to be called via reflection


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36708634
  
--- Diff: docs/running-on-yarn.md ---
@@ -320,6 +320,14 @@ If you need a reference to the proper location to put 
log files in the YARN so t
   /td
 /tr
 tr
+  tdcodespark.yarn.application.tags/code/td
+  td(none)/td
+  td
+  Comma-separated list of strings to pass through as YARN application tags 
appearing
+  in YARN ApplicationReports, which can be used for filtering when 
querying YARN.
--- End diff --

when querying YARN apps


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36708965
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType(SPARK)
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =
+try {
+  val method = appContext.getClass().getMethod(
+setApplicationTags, classOf[java.util.Set[String]])
+  method.invoke(appContext, new 
java.util.HashSet[String](tagCollection))
+  logInfo(Applied setApplicationTags based on %s='%s'
--- End diff --

I'd omit this log message.  I don't think it's more important than other 
attributes like queue and app name, which we don't log.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36709001
  
--- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala ---
@@ -163,6 +163,23 @@ private[spark] class Client(
 appContext.setQueue(args.amQueue)
 appContext.setAMContainerSpec(containerContext)
 appContext.setApplicationType(SPARK)
+sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS)
+  .map(StringUtils.getTrimmedStringCollection(_))
+  .filter(!_.isEmpty())
+  .foreach { tagCollection =
+try {
+  val method = appContext.getClass().getMethod(
+setApplicationTags, classOf[java.util.Set[String]])
+  method.invoke(appContext, new 
java.util.HashSet[String](tagCollection))
+  logInfo(Applied setApplicationTags based on %s='%s'
+.format(CONF_SPARK_YARN_APPLICATION_TAGS, tagCollection))
+
+} catch {
+  case e: NoSuchMethodException =
+logWarning(Ignoring conf %s='%s'; setApplicationTags missing 
in this version of YARN.
--- End diff --

I'd say Ignoring spark.yarn.tags because this version of YARN does not 
support it


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...

2015-08-10 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/8072#discussion_r36709027
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala ---
@@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers 
with BeforeAndAfterAll {
 cp should contain (/remotePath/my1.jar)
   }
 
+  test(configuration and args propagate through 
createApplicationSubmissionContext) {
--- End diff --

Have you verified that this test passes both against versions of YARN that 
support app tags and against versions that do not?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-08-04 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7075#issuecomment-127758361
  
This LGTM pending jenkins


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-08-04 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7075#issuecomment-127758313
  
jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...

2015-08-03 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7874#issuecomment-127438873
  
jenkins, test this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...

2015-08-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7874#discussion_r36062508
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -113,6 +113,7 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   /**
* :: DeveloperApi ::
* Alternative constructor for setting preferred locations where Spark 
will create executors.
+   * Note that preferred locations feature does not work as it is supposed 
to, see SPARK-8949
--- End diff --

I would be more explicit and say that passing in preferred node location 
data has no effect at all.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...

2015-08-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7874#discussion_r36062537
  
--- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala ---
@@ -153,6 +155,8 @@ class SparkContext(config: SparkConf) extends Logging 
with ExecutorAllocationCli
   preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) =
   {
 this(SparkContext.updatedConf(new SparkConf(), master, appName, 
sparkHome, jars, environment))
+if (preferredNodeLocationData.size  0)
--- End diff --

Use curly braces around if block


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-08-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r36125365
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -53,6 +53,7 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
   object NullHypothesis extends Enumeration {
 type NullHypothesis = Value
 val OneSampleTwoSided = Value(Sample follows theoretical 
distribution)
+val TwoSampleTwoSided = Value(Both samples follow the same 
distribution)
--- End diff --

Nit: I'd make the above one Sample follows *the* theoretical distribution 
or make the bottom one Both samples follow same distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9375] Make sure the total number of exe...

2015-08-03 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7716#issuecomment-127390297
  
@andrewor14, the code that @KaiXinXiaoLei suggests fixing seems to be code 
most recently updated in SPARK-8119.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-08-03 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r36125136
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -254,4 +254,115 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
 assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
   }
+
--- End diff --

If making the methods package private would simplify the test, I think it's 
reasonable to do so


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939544
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
+val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0)
+// traverse the data in partition and calculate distances and counts
+val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, 
isSample1)) =
+  val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+  val cdf1 = (acc.ix1 + add1) / n1
+  val cdf2 = (acc.ix2 + add2) / n2
+  val dist = cdf1 - cdf2
+  KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + 
add1, acc.ix2 + add2)
+}
+val results = if (pResults == initAcc) {
+  Array[(Double, Double, Double)]()
+} else {
+  Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2  -  
(pResults.ix2 + 1) * n1))
+}
+results.iterator
+  }
+
+  /**
+   * Adjust candidate extremes by the appropriate constant. The resulting 
maximum corresponds to
+   * the two-sample, two-sided Kolmogorov-Smirnov test
+   * @param localData `Array[(Double, Double, Double)]` contains the 
candidate extremes from each
+   * partition, along with the numerator for the necessary 
constant adjustments
+   * @param n `Double` The denominator in the constant adjustment (i.e. 
(size of sample 1 ) * (size
+   * of sample 2))
+   * @return The two-sample, two-sided Kolmogorov-Smirnov statistic
+   */
+  private def searchTwoSampleStatistic(localData: Array[(Double, Double, 
Double)], n: Double)
+: Double = {
+val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator 
for constant adjustment
+// adjust differences based on the number of elements preceding it, 
which should provide
+// the correct distance between the 2 empirical CDFs
+val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), 
(minCand, maxCand, ct)) =
+val adjConst = prevCt / n
+val dist1 = math.abs(minCand

[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939524
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
+val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0)
+// traverse the data in partition and calculate distances and counts
+val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, 
isSample1)) =
+  val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+  val cdf1 = (acc.ix1 + add1) / n1
+  val cdf2 = (acc.ix2 + add2) / n2
+  val dist = cdf1 - cdf2
+  KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + 
add1, acc.ix2 + add2)
+}
+val results = if (pResults == initAcc) {
+  Array[(Double, Double, Double)]()
+} else {
+  Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2  -  
(pResults.ix2 + 1) * n1))
+}
+results.iterator
+  }
+
+  /**
+   * Adjust candidate extremes by the appropriate constant. The resulting 
maximum corresponds to
+   * the two-sample, two-sided Kolmogorov-Smirnov test
+   * @param localData `Array[(Double, Double, Double)]` contains the 
candidate extremes from each
+   * partition, along with the numerator for the necessary 
constant adjustments
+   * @param n `Double` The denominator in the constant adjustment (i.e. 
(size of sample 1 ) * (size
+   * of sample 2))
+   * @return The two-sample, two-sided Kolmogorov-Smirnov statistic
+   */
+  private def searchTwoSampleStatistic(localData: Array[(Double, Double, 
Double)], n: Double)
+: Double = {
+val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator 
for constant adjustment
--- End diff --

Place this comment on the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct

[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939958
  
--- Diff: 
mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala ---
@@ -254,4 +254,115 @@ class HypothesisTestSuite extends SparkFunSuite with 
MLlibTestSparkContext {
 assert(rCompResult.statistic ~== rKSStat relTol 1e-4)
 assert(rCompResult.pValue ~== rKSPVal relTol 1e-4)
   }
+
--- End diff --

Are there weird edge cases here with the partitioning?  E.g. where a 
partition has no elements?  Or only elements from one sample?  Can we provide 
tests for them?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939346
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
--- End diff --

Call this `unionedData`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939722
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
+val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0)
+// traverse the data in partition and calculate distances and counts
--- End diff --

nit: in *the* partition


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939695
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
+val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0)
+// traverse the data in partition and calculate distances and counts
+val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, 
isSample1)) =
+  val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+  val cdf1 = (acc.ix1 + add1) / n1
+  val cdf2 = (acc.ix2 + add2) / n2
+  val dist = cdf1 - cdf2
+  KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + 
add1, acc.ix2 + add2)
+}
+val results = if (pResults == initAcc) {
+  Array[(Double, Double, Double)]()
+} else {
+  Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2  -  
(pResults.ix2 + 1) * n1))
--- End diff --

Looks like there are a couple spots with two consecutive spaces in here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939844
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
--- End diff --

Can we describe in a little more detail what the mentioned count is a count 
of?  Or if it's described somewhere else in the code, reference it?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939884
  
--- Diff: docs/mllib-statistics.md ---
@@ -431,11 +431,16 @@ user tests against the normal distribution 
(`distName=norm`), but does not pro
 parameters, the test initializes to the standard normal distribution and 
logs an appropriate 
 message.
 
+There is also a 2-sample, 2-sided implementation available, which tests if 
the 2 samples are drawn
--- End diff --

rather than which tests if, I'd mention the null hypothesis.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939785
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
+val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0)
+// traverse the data in partition and calculate distances and counts
+val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, 
isSample1)) =
+  val (add1, add2) = if (isSample1) (1, 0) else (0, 1)
+  val cdf1 = (acc.ix1 + add1) / n1
+  val cdf2 = (acc.ix2 + add2) / n2
+  val dist = cdf1 - cdf2
+  KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + 
add1, acc.ix2 + add2)
+}
+val results = if (pResults == initAcc) {
--- End diff --

Do things go bad if we don't special case here?  Or it's just less 
efficient?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939573
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
--- End diff --

This can go on the line above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939624
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
+   *count corresponding to the numerator of the adjustment 
constant coming from this
+   *partition
+   */
+  private def searchTwoSampleCandidates(
+  partData: Iterator[(Double, Boolean)],
+  n1: Double,
+  n2: Double)
+: Iterator[(Double, Double, Double)] = {
+// fold accumulator: local minimum, local maximum, index for sample 1, 
index for sample2
+case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int)
--- End diff --

KS2Acc is a little cryptic.  Maybe `ExtremaPositions`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939648
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
--- End diff --

extra space before comma


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939466
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala
 ---
@@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends 
Logging {
 val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt)
 new KolmogorovSmirnovTestResult(pval, ksStat, 
NullHypothesis.OneSampleTwoSided.toString)
   }
+
+  /**
+   * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which 
tests if the 2 samples
+   * come from the same distribution
+   * @param data1 `RDD[Double]` first sample of data
+   * @param data2 `RDD[Double]` second sample of data
+   * @return 
[[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test
+   *statistic, p-value, and appropriate null hypothesis
+   */
+  def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): 
KolmogorovSmirnovTestResult = {
+val n1 = data1.count().toDouble
+val n2 = data2.count().toDouble
+val isSample1 = true // identifier for sample 1, needed after co-sort
+// combine identified samples
+val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, 
!isSample1))
+// co-sort and operate on each partition
+val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions 
{ part =
+  searchTwoSampleCandidates(part, n1, n2) // local extrema
+}.collect()
+val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: 
global extreme
+evalTwoSampleP(ksStat, n1.toInt, n2.toInt)
+  }
+
+  /**
+   * Calculates maximum distance candidates and counts from each sample 
within one partition for
+   * the two-sample, two-sided Kolmogorov-Smirnov test implementation
+   * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition 
of the co-sorted RDDs,
+   *each element is additionally tagged with a boolean 
flag for sample 1 membership
+   * @param n1 `Double` sample 1 size
+   * @param n2 `Double` sample 2 size
+   * @return `Iterator[(Double, Double, Double)]` where the first element 
is an unadjusted minimum
+   *distance , the second is an unadjusted maximum distance (both 
of which will later
+   *be adjusted by a constant to account for elements in prior 
partitions), and a
--- End diff --

and the third is


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7075#discussion_r35939406
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -196,4 +196,18 @@ object Statistics {
 : KolmogorovSmirnovTestResult = {
 KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
   }
+
+  /**
+   * Perform a two-sample, two-sided Kolmogorov-Smirnov test for 
probability distribution equality
+   * The null hypothesis corresponds to both samples coming from the same 
distribution
--- End diff --

corresponds is a little weird.  Maybe just The null hypothesis is that 
both samples come from the same distribution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...

2015-07-30 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7075#issuecomment-126536916
  
@mengxr @josepablocam oops thought it was still a WIP for some reason.  
Just took a pass.  It looks mostly done - I just had a bunch of nits and a test 
request. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9375] Make sure the total number of exe...

2015-07-28 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7716#issuecomment-125735637
  
@KaiXinXiaoLei we fixed a couple issues that had this symptom in 1.3.  Are 
you definitely running with a version that's 1.4 or later?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-27 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-125372128
  
Great.  LGTM as well.  Just merged this.  Thanks @jerryshao!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-9388] [yarn] Make executor info log mes...

2015-07-27 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7706#discussion_r35601139
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala ---
@@ -86,10 +86,15 @@ class ExecutorRunnable(
 val commands = prepareCommand(masterAddress, slaveId, hostname, 
executorMemory, executorCores,
   appId, localResources)
 
-logInfo(sSetting up executor with environment: $env)
-logInfo(Setting up executor with commands:  + commands)
-ctx.setCommands(commands)
+
logInfo(===)
+logInfo(Yarn executor launch context:)
--- End diff --

Nit: capitalize all of YARN


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-26 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-125083816
  
@kayousterhout do the DAGScheduler changes look good to you after 
@jerryshao 's last pass?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-24 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35399233
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient {
* This can result in canceling pending requests or filing additional 
requests.
* @return whether the request is acknowledged by the cluster manager.
*/
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+  numExecutors: Int,
--- End diff --

Mind adding the above text to the documentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35293276
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

Ah just looked deeper into the code and it seems like localities are often 
empty.  So nothing to be done here, sorry for the noise.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344727
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35343248
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
--- End diff --

Nit: put comments above, as opposed to after, this class members


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7278#issuecomment-124167898
  
This LGTM after some cosmetic changes.  @mengxr what do you think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35342576
  
--- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala 
---
@@ -196,4 +196,33 @@ object Statistics {
 : KolmogorovSmirnovTestResult = {
 KolmogorovSmirnovTest.testOneSample(data, distName, params: _*)
   }
+
+  /**
+   * Conduct a 1-sample Anderson-Darling test for the null hypothesis that 
the data
+   * comes from a given theoretical distribution. The Anderson-Darling 
test is an alternative
+   * to the Kolmogorov-Smirnov test, and is more adequate at identifying 
departures from the
+   * theoretical distribution at the tails. The implementation returns an
+   * `AndersonDarlingTestResult`, which includes the statistic, the 
critical values at varying
+   * significance levels, and the null hypothesis. Note that the critical 
values are calculated
+   * assuming the parameters have been calculated from the data sample.
+   * If the parameters for the theoretical distribution are not in a valid 
domain, throws an
+   * exception.
+   * @param data the data to be tested
+   * @param distName name of the theoretical distribution to test against. 
Currently
+   *supports Normal (norm), Exponential (exp), Gumbel 
(gumbel),
+   *Logistic (logistic), and Weibull (weibull) 
distributions
+   * @param params provides the parameters for the theoretical 
distribution.
+   *   The order of parameters are as follow
+   *   Normal - [mu, sigma] (location, scale)
+   *   Exponential - [1 / lambda] (scale)
+   *   Gumbel - [mu, beta] (location, scale)
+   *   Logistic - [mu, s] (location, scale)
+   *   Weibull - [lambda, k]  (scale, shape)
+   * @return
--- End diff --

Give something short for @return


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35343649
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
--- End diff --

I'd call this getCriticalValues for clarity


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344003
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344145
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344395
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344356
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35343600
  
--- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala 
---
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.stat.test
+
+import scala.annotation.varargs
+
+import collection.immutable.ListMap
+
+import org.apache.commons.math3.distribution.{ExponentialDistribution, 
GumbelDistribution,
+  LogisticDistribution, NormalDistribution, WeibullDistribution}
+
+import org.apache.spark.Logging
+import org.apache.spark.rdd.RDD
+
+/**
+ * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov 
(KS) test, tests whether the
+ * data follow a given theoretical distribution. It should be used with 
continuous data and
+ * assumes that no repeated values occur (the presence of ties can affect 
the validity of the test).
+ * The AD test provides an alternative to the KS test. Namely, it is better
+ * suited to identify departures from the theoretical distribution at the 
tails.
+ * It is worth noting that the the AD test's critical values depend on the
+ * distribution being tested against. The AD statistic is defined as
+ * {{{
+ * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + 
\ln{(1 - \Phi{(x_{N+1-i})})
+ * }}}
+ * where {{{\Phi}}} is the CDF of the given distribution and `N` is the 
sample size.
+ * For more information 
@see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]]
+ */
+private[stat] object AndersonDarlingTest extends Logging {
+
+  object NullHypothesis extends Enumeration {
+type NullHypothesis = Value
+val OneSample = Value(Sample follows theoretical distribution.)
+  }
+
+  /**
+   * AndersonDarlingTheoreticalDist is a trait that every distribution 
used in an AD test must
+   * extend. The rationale for this is that the AD test has 
distribution-dependent critical values,
+   * and by requiring extension of this trait we guarantee that future 
additional distributions
+   * make sure to add the appropriate critical values (CVs) (or at least 
acknowledge
+   * that they should be added)
+   */
+  sealed trait AndersonDarlingTheoreticalDist extends Serializable {
+val params: Seq[Double]  // parameters used to initialized the 
distribution
+
+def cdf(x: Double): Double // calculate the cdf under the given 
distribution for value x
+
+def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, 
adjusted for sample size
+  }
+
+  /**
+   * Critical values and adjustments for distributions sourced from
+   * 
[[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs-
+   * test.pdf]]
+   * 
[[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], 
which in turn
+   * references:
+   *
+   * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and
+   * Some Comparisons, Journal of the American Statistical Association,
+   * Vol. 69, pp. 730-737.
+   *
+   * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit
+   * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4,
+   * pp. 357-369.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value
+   * Distribution, Biometrika, Vol. 64, pp. 583-588.
+   *
+   * Stephens, M. A. (1977). Goodness of Fit with Special Reference
+   * to Tests for Exponentiality , Technical Report No. 262,
+   * Department of Statistics, Stanford University, Stanford, CA.
+   *
+   * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution
+   * Based on the Empirical Distribution Function, Biometrika, Vol. 66,
+   * pp. 591-595.
+   */
+
+  // Exponential distribution
+  class AndersonDarlingExponential(val params: Seq[Double]) extends

[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-23 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7278#discussion_r35344486
  
--- Diff: docs/mllib-statistics.md ---
@@ -456,6 +456,39 @@ val testResult2 = 
Statistics.kolmogorovSmirnovTest(data, myCDF)
 /div
 /div
 
+MLlib also provides a 1-sample Anderson-Darling test, which allows users 
to test a given 
+sample of data, in the form of `RDD[Double]`, against one of various 
continuous distributions.
+The statistic can be used to test the null hypothesis that the data come 
from that given 
+distribution. Thus the Anderson-Darling test is a goodness-of-fit for 
continuous
--- End diff --

goodness-of-fit *test*


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-22 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35291311
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

What situation does that occur in?  When a stage has some tasks with 
locality preferences and some tasks without?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35147210
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

You can take this out and use `taskLocalityPreferences.size()`, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35146872
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala ---
@@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient {
* This can result in canceling pending requests or filing additional 
requests.
* @return whether the request is acknowledged by the cluster manager.
*/
-  private[spark] def requestTotalExecutors(numExecutors: Int): Boolean
+  private[spark] def requestTotalExecutors(
+  numExecutors: Int,
--- End diff --

To be as clear as possible, can you update the documentation for this 
method to the following.  Also, change `localityAwarePendingTasks` to 
`localityAwareTasks` because the tasks need not be pending. 

---

Update the cluster manager on our scheduling needs.  Three bits of 
information are included to help it make decisions.

@param numExecutors
  The total number of executors we'd like to have.  The cluster manager 
shouldn't kill any running executor to reach this number, but, if all existing 
executors were to die, this is the number of executors we'd want to be 
allocated.

@param localityAwareTasks
  The number of tasks in all active stages that have a locality 
preferences.  This includes running, pending, and completed tasks.

@param hostToLocalTaskCount
  A map of hosts to the number of tasks from all active stages that would 
like to like to run on that host.  This includes running, pending, and 
completed tasks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35145466
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35147681
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
+var numTasksPending = 0
--- End diff --

Is it possible for the localities to ever be empty?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35147827
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager(
   allocationManager.synchronized {
 stageIdToNumTasks(stageId) = numTasks
 allocationManager.onSchedulerBacklogged()
+
--- End diff --

Add a short comment here: // Compute the number of tasks requested by the 
stage on each host


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35148779
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get a tuple of (the number of task with locality preferences, a map 
where each pair is a
+ * node and the number of tasks that would like to be scheduled on 
that node).
+ */
--- End diff --

Mention These hints are updated when stages arrive and complete, so are 
not up-to-date at task granularity within stages.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35149276
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
--- End diff --

Consolidate this with LocalityPreferredContainerPlacementStrategy because 
there are no other implementations.  I agree that it's often nice to have 
interfaces separate from their implementations, even if there's only a single 
implementation, but I haven't seen this generally practiced in Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151031
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -225,8 +245,11 @@ private[yarn] class YarnAllocator(
   logInfo(sWill request $missing executor containers, each with 
${resource.getVirtualCores}  +
 scores and ${resource.getMemory} MB memory including 
$memoryOverhead MB overhead)
 
-  for (i - 0 until missing) {
-val request = createContainerRequest(resource)
+  val containerLocalityPreferences = 
containerPlacementStrategy.localityOfRequestedContainers(
--- End diff --

It's possible that, since the last time we made container requests, stages 
have completed and been submitted, and that the localities at which we 
requested our pending executors no longer apply to our current needs.  Can we 
remove all outstanding container requests and add requests anew each time to 
avoid this?  Ideally we would run a benchmark to see how computationally 
expensive this is.  At the very least, can we add a TODO here explaining the 
problem and file a followup JIRA to address this?



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151608
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
--- End diff --

What exact is meant by matching localities?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35148929
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala ---
@@ -127,6 +127,16 @@ private[yarn] class YarnAllocator(
 }
   }
 
+  // A map to store preferred hostname and possible task numbers running 
on it.
+  private var hostToLocalTaskCounts: Map[String, Int] = Map.empty
+
+  // Locality required pending task number
--- End diff --

Number of tasks that have locality preferences in active stages


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35149314
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151721
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
--- End diff --

If containers are existed - If containers exist


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/6394#issuecomment-123474754
  
I made a final pass on this for clarity issues.  I'm being nitpicky on some 
of the stuff, but I think it's important to be as clear as possible given that 
what's going on here is inherently pretty complicated.

@kayousterhout are you satisfied with the DAGScheduler changes?  If so, I 
think that after this batch of comments are addressed, the patch should be good 
to go.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35148005
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get a tuple of (the number of task with locality preferences, a map 
where each pair is a
+ * node and the number of tasks that would like to be scheduled on 
that node).
+ */
+def executorPlacementHints(): (Int, Map[String, Int]) =
+  allocationManager.synchronized {
+var localityAwarePendingTasks = 0
--- End diff --

IIUC, these tasks might be running or completed, so replace with 
`localityAwareTasks`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151169
  
--- Diff: 
yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import org.scalatest.{BeforeAndAfterEach, Matchers}
+
+import org.apache.spark.SparkFunSuite
+
+class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers 
with BeforeAndAfterEach {
+
+  private val yarnAllocatorSuite = new YarnAllocatorSuite
--- End diff --

Ah ok my bad


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151299
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
--- End diff --

`existed` - `existing`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35151397
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
--- End diff --

Replace this with Consider a situation in which we have 20 tasks...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35152686
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35152962
  
--- Diff: 
yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala
 ---
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.deploy.yarn
+
+import scala.collection.mutable.ArrayBuffer
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.yarn.util.RackResolver
+
+import org.apache.spark.SparkConf
+
+private[yarn] case class ContainerLocalityPreferences(nodes: 
Array[String], racks: Array[String])
+
+private[yarn] trait ContainerPlacementStrategy {
+
+  /**
+   * Calculate each container's node locality and rack locality
+   * @param numContainer number of containers to calculate
+   * @param numLocalityAwarePendingTasks number of locality required 
pending tasks
+   * @param hostToLocalTaskCount a map to store the preferred hostname and 
possible task
+   * numbers running on it, used as hints for 
container allocation
+   * @return node localities and rack localities, each locality is an 
array of string,
+   * the length of localities is the same as number of containers
+   */
+  def localityOfRequestedContainers(
+  numContainer: Int,
+  numLocalityAwarePendingTasks: Int,
+  hostToLocalTaskCount: Map[String, Int]
+): Array[ContainerLocalityPreferences]
+}
+
+/**
+ * This strategy is calculating the optimal locality preferences of YARN 
containers by considering
+ * the node ratio of pending tasks, number of required cores/containers 
and and locality of current
+ * existed containers. The target of this algorithm is to maximize the 
number of tasks that
+ * would run locally.
+ *
+ * The details of this algorithm is described as below, if we have 20 
tasks which
+ * require (host1, host2, host3) and 10 tasks which require (host1, host2, 
host4),
+ * besides each container has 2 cores and cpus per task is 1, so the 
required container number is
+ * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10).
+ *
+ * 1. If requested container number (18) is more than the required 
container number (15):
+ *
+ * requests for 5 containers with nodes: (host1, host2, host3, host4)
+ * requests for 5 containers with nodes: (host1, host2, host3)
+ * requests for 5 containers with nodes: (host1, host2)
+ * requests for 3 containers with no locality preferences.
+ *
+ * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers 
with no locality
+ * preferences.
+ *
+ * 2. If requested container number (10) is less than or equal to the 
required container number
+ * (15):
+ *
+ * requests for 4 containers with nodes: (host1, host2, host3, host4)
+ * requests for 3 containers with nodes: (host1, host2, host3)
+ * requests for 3 containers with nodes: (host1, host2)
+ *
+ * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 
: 2 : 1)
+ *
+ * 3. If containers are existed but no matching localities, follow the 
method of 1 and 2.
+ *
+ * 4. If containers are existed and some localities are matched. For 
example if we have 1
+ * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and 
the expected containers
+ * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2),
+ * so the newly requested containers on each node would be updated to 
(host1: 4, host2: 4,
+ * host3: 3, host4: 1), 12 containers by total.
+ *
+ *   4.1 If requested container number (18) is more than newly required 
containers (12). Follow
+ *   method 1 with updated ratio 4 : 4 : 3 : 1.
+ *
+ *   4.2 If request container number (10) is more than newly required 
containers (12). Follow
+ *   method 2 with updated ratio 4 : 4 : 3 : 1.
+ *
+ * 5. If containers are existed and existing localities can fully cover

[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35148070
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager(
 def isExecutorIdle(executorId: String): Boolean = {
   !executorIdToTaskIds.contains(executorId)
 }
+
+/**
+ * Get a tuple of (the number of task with locality preferences, a map 
where each pair is a
--- End diff --

`task` - `tasks`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...

2015-07-21 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/6394#discussion_r35148227
  
--- Diff: 
core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala ---
@@ -295,7 +295,9 @@ private[spark] class ExecutorAllocationManager(
 
   // If the new target has not changed, avoid sending a message to the 
cluster manager
   if (numExecutorsTarget  oldNumExecutorsTarget) {
-client.requestTotalExecutors(numExecutorsTarget)
+val (localityAwarePendingTasks, preferredLocalities) = 
listener.executorPlacementHints()
--- End diff --

Can we cache the result of `executorPlacementHints` and only update it when 
stages come or go?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...

2015-07-20 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7278#issuecomment-123123557
  
jenkins, retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8996] [MLlib] [PySpark] Python API for ...

2015-07-17 Thread sryza
Github user sryza commented on the pull request:

https://github.com/apache/spark/pull/7430#issuecomment-122346380
  
LGTM modulo a small comment


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request: [SPARK-8996] [MLlib] [PySpark] Python API for ...

2015-07-17 Thread sryza
Github user sryza commented on a diff in the pull request:

https://github.com/apache/spark/pull/7430#discussion_r34910046
  
--- Diff: python/pyspark/mllib/stat/_statistics.py ---
@@ -238,6 +242,60 @@ def chiSqTest(observed, expected=None):
 jmodel = callMLlibFunc(chiSqTest, 
_convert_to_vector(observed), expected)
 return ChiSqTestResult(jmodel)
 
+@staticmethod
+@ignore_unicode_prefix
+def kolmogorovSmirnovTest(data, distName=norm, *params):
+
+.. note:: Experimental
+
+Performs the Kolmogorov Smirnov (KS) test for data sampled from
+a continuous distribution. It tests the null hypothesis that
+the data is generated from a particular distribution.
+
+The given data is sorted and the Empirical Cumulative
+Distribution Function (ECDF) is calculated
+which for a given point is the number of points having a CDF
+value lesser than it divided by the total number of points.
+
+Since the data is sorted, this is a step function
+that rises by (1 / length of data) for every ordered point.
+
+The KS statistic gives us the maximum distance between the
+ECDF and the CDF. Intuitively if this statistic is large, the
+probabilty that the null hypothesis is true becomes small.
+For specific details of the implementation, please have a look
+at the Scala documentation.
+
+:param data: RDD, samples from the data
+:param distName: string, currently only norm is supported.
+ (Normal distribution) to calculate the
+ theoretical distribution of the data.
+:param params: additional values which need to be provided for
+   a certain distribution.
+   If not provided, the default values are used.
+:return: KolmogorovSmirnovTestResult object containing the test
+ statistic, degrees of freedom, p-value,
+ the method used, and the null hypothesis.
+
+ kstest = Statistics.kolmogorovSmirnovTest
--- End diff --

Small thing: can we include an example that passes parameters in?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



  1   2   3   4   5   6   7   8   9   10   >