[GitHub] spark pull request: [SPARK-7173][YARN] Add label expression suppor...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9800#discussion_r45254024 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -225,7 +225,32 @@ private[spark] class Client( val capability = Records.newRecord(classOf[Resource]) capability.setMemory(args.amMemory + amMemoryOverhead) capability.setVirtualCores(args.amCores) -appContext.setResource(capability) + +if (sparkConf.contains("spark.yarn.am.nodeLabelExpression")) { + try { +val amRequest = Records.newRecord(classOf[ResourceRequest]) +amRequest.setResourceName(ResourceRequest.ANY) +amRequest.setPriority(Priority.newInstance(0)) +amRequest.setCapability(capability) +amRequest.setNumContainers(1) +amRequest.setRelaxLocality(true) --- End diff -- Is this required? It should be the default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-11410] [PYSPARK] Add python bindings fo...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/9504#issuecomment-154201525 Title should have the [SQL] tag as well --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9313#discussion_r43909799 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1611,8 +1611,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * If addToCurrentThread is true, attempt to add the new class to the current threads class + * loader. */ def addJar(path: String) { +addJar(path, false) + } + + def addJar(path: String, addToCurrentThread: Boolean) { --- End diff -- In that case, maybe good to call this out explicitly in the doc? Also, the naming `addToCurrentThread` makes it seem like it'll get added *only* to the current thread, so maybe something like `addToContextClassLoader` would be more clear. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9313#discussion_r43726053 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1611,8 +1611,14 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli * Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported * filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. + * If addToCurrentThread is true, attempt to add the new class to the current threads class + * loader. */ def addJar(path: String) { +addJar(path, false) + } + + def addJar(path: String, addToCurrentThread: Boolean) { --- End diff -- Would it be correct to say that in nearly all cases, setting the second argument to true will result in the jar being added to all of the application's threads? Because Spark sets the context classloader to a MutableClassLoader before loading the application's main class, and then all other app threads will inherit this as the default unless they explicitly change the context class loader? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9313#discussion_r43726148 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -1667,6 +1673,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli if (key != null) { addedJars(key) = System.currentTimeMillis logInfo("Added JAR " + path + " at " + key + " with timestamp " + addedJars(key)) +if (addToCurrentThread) { + val currentCL = Utils.getContextOrSparkClassLoader + currentCL match { +case cl: MutableURLClassLoader => cl.addURL(new URI(key).toURL()) +case _ => logWarning(s"Unsupported cl $currentCL will not update jars for current thread") --- End diff -- My opinion is that it's better to throw an exception here. Though as that's an API behavior question, probably best to defer to a core maintainer? @pwendell any thoughts? And thoughts on adding this option to `addJar` in general? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][SPARK-11421][PYSPARK][CORE] Prov...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9313#discussion_r43682628 --- Diff: core/src/main/scala/org/apache/spark/util/Utils.scala --- @@ -175,6 +175,29 @@ private[spark] object Utils extends Logging { } /** + * Update the current threads class loader. + * Requires the current class loader is a MutableURLClassLoader, otherwise skips updating with a + * warning. Intended for use by addJar(), although constructing an instance of the class will + * still require: + * sc._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass("class name") + * as described in SPARK-5185. + */ + def updatePrimaryClassLoader(sc: SparkContext) { --- End diff -- What's the advantage of this update method that compares the full contents of `sc.addedJars` to those already registered with the current classloader, vs. just calling `cs.addUrl` directly on the jar being added? A possibly confusing effect of this approach is that if I have code like: sc.addJar(jar1) ... other stuff blah blah blah ... sc.addJar(jar2, true) jar1 will get added to the classpath only when `sc.addJar` is called the second time. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4921. TaskSetManager.dequeueTask returns...
GitHub user sryza reopened a pull request: https://github.com/apache/spark/pull/3816 SPARK-4921. TaskSetManager.dequeueTask returns PROCESS_LOCAL for NO_PREF ... ...tasks You can merge this pull request into a Git repository by running: $ git pull https://github.com/sryza/spark sandy-spark-4921 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/3816.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3816 commit 247ce5587b8a06fe586a2730f1ad2df4ab7f79dc Author: Sandy Ryza <sa...@cloudera.com> Date: 2014-12-28T03:55:47Z PARK-4921. TaskSetManager.dequeueTask returns PROCESS_LOCAL for NO_PREF tasks --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-4921. TaskSetManager.dequeueTask returns...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/3816#issuecomment-153165000 @zsxwing I have sadly lost most of the context on this issue and don't have time to pick it back up at the moment. What you point out does seem to be an issue, though. Definitely feel free to take my patch if you think it's the right fix. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/9313#issuecomment-152234819 Could we add that flag to the Scala API as well? Would that break binary compatibility? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/9313#issuecomment-152069509 Does the Scala SparkContext#addJar add the jar to the driver classpath? My impression was that it does not. If so, this would be a little inconsistent, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10658][PYSPARK] Provide add jars to py ...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/9313#discussion_r43347726 --- Diff: python/pyspark/context.py --- @@ -806,6 +806,24 @@ def addPyFile(self, path): import importlib importlib.invalidate_caches() +def addJar(self, path): +""" +Adds a JAR dependency for all tasks to be executed on this SparkContext in the future. +The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported +filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node. +""" +self._jsc.sc().addJar(path) +self._jvm.PythonUtils.updatePrimaryClassLoader(self._jsc) + +def _loadClass(self, className): +""" +.. note:: Experimental + +Loads a JVM class using the MutableClass loader used by spark. +This function exists because Py4J uses a different class loader. +""" + self._jvm.java.lang.Thread.currentThread().getContextClassLoader().loadClass(className) --- End diff -- In Utils.getContextOrSparkClassLoader, it seems like the context classloader can sometimes be null. Is that possible here as well? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10921] [YARN] Completely remove the use...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8976#issuecomment-145976915 Taking these out seems like the right thing to do to me --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7918#issuecomment-142201185 Ah, thanks for digging that up @rxin. In light of that, I definitely don't see a compelling reason to keep the input format caching around. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7918#issuecomment-140575553 Ah, yeah, it does look like ReflectionUtils.newInstance caches the constructor. In that case I'd be OK with removing the input format cache entirely. Any thoughts @JoshRosen ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10611 Clone Configuration for each task ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8763#issuecomment-140519129 Oh, nevermind, sorry, this is off by default. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-10611 Clone Configuration for each task ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8763#issuecomment-140518870 This seems like it could have a pretty serious perf impact. Are we able to do some benchmarking to assess this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-10288][Yarn] Add REST client for Spark ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8504#issuecomment-138678684 My reaction is similar to Andrew's. I haven't across much user pain around the way that Spark submits apps to YARN. If particular use cases where the current way of doing things causes problems, I'd be willing to reconsider. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9585] add config to enable inputFormat ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7918#issuecomment-138687768 Sorry for the delay here, have been on PTO. IIUC, the change here makes Spark work with some exotic InputFormats that it previously did not work with due to thread safety, at the possible expense of performance. Users can revert to the old behavior with a config. There's no associated JIRA, but a6eeb5ffd54956667ec4e793149fdab90041ad6c is the hash of the change that appears to have introduced the input format cache. Unfortunately, I don't see any performance numbers there justifying its addition. It seems like the main overhead we're trying to avoid is the reflective calls. What about caching the constructor so that we don't need to look it up for each task? Lastly, is an equivalent change needed for NewHadoopRDD? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9969][YARN] Remove old MR classpath API...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8192#issuecomment-132380275 Yeah, it's needed for the InputFormats that Spark relies on to read Hadoop data. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8072#issuecomment-132214961 @tgravescs my thinking for just spark.yarn.tags was that it's redundant to include application, because configs are by definition per-application. We'd also be consistent with spark.yarn.queue and spark.yarn.jars. If we do include application, I think it should be spark.yarn.application-tags, because then we're not adding a new application namespace. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8072#issuecomment-132218054 Cool, in that case @dennishuo mind making the change that @vanzin suggested and then I'll merge this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7075#issuecomment-131819801 @mengxr is it too late to get this in to 1.5? @josepablocam are you able to resolve merge conflicts? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-7707. User guide and example code for Ke...
GitHub user sryza opened a pull request: https://github.com/apache/spark/pull/8230 SPARK-7707. User guide and example code for KernelDensity You can merge this pull request into a Git repository by running: $ git pull https://github.com/sryza/spark sandy-spark-7707 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/8230.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #8230 commit 5120a645d11378d4c7a2ae634082b46469810632 Author: Sandy Ryza sa...@cloudera.com Date: 2015-08-16T12:37:03Z SPARK-7707. User guide and example code for KernelDensity --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9570][Docs][YARN]Consistent recommendat...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8071#issuecomment-131547053 Sorry all for the delay, have been in Indonesia for the week. I'm also in favor of the standard way being --deploy-mode. This keeps things consistent with standalone and Mesos. I think it's probably fine if in some places in the docs we use yarn-client as shorthand to refer to running on YARN with the client deploy mode, but in the actual instructions on how to launch, my personal preference would be `--master yarn --deploy-mode client/cluster`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-7707. User guide and example code for Ke...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8230#issuecomment-131547427 retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8072#issuecomment-131679667 @tgravescs @vanzin @andrewor14 can I get one of you to sign off on my proposed property name `spark.yarn.tags`? Otherwise, this LGTM --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/8072#issuecomment-130196650 jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36833771 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType(SPARK) +sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection = +try { + // The setApplicationTags method was only introduced in Hadoop 2.4+, so we need to use + // reflection to set it, printing a warning if a tag was specified but the YARN version + // doesn't support it. + val method = appContext.getClass().getMethod( +setApplicationTags, classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tagCollection)) +} catch { + case e: NoSuchMethodException = +logWarning(Ignoring %s='%s' because this version of YARN does not support it --- End diff -- Nit: use `sIgnoring $CONF_SPARK_YARN_APPLICATION_TAGS because`... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36708697 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType(SPARK) +sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection = +try { + val method = appContext.getClass().getMethod( +setApplicationTags, classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tagCollection)) + logInfo(Applied setApplicationTags based on %s='%s' +.format(CONF_SPARK_YARN_APPLICATION_TAGS, tagCollection)) + --- End diff -- Extra space unnecessary --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36708684 --- Diff: docs/running-on-yarn.md --- @@ -320,6 +320,14 @@ If you need a reference to the proper location to put log files in the YARN so t /td /tr tr + tdcodespark.yarn.application.tags/code/td --- End diff -- I would change this to simply spark.yarn.tags --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36708654 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType(SPARK) +sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection = +try { + val method = appContext.getClass().getMethod( --- End diff -- Add a comment here mentioning why this needs to be called via reflection --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36708634 --- Diff: docs/running-on-yarn.md --- @@ -320,6 +320,14 @@ If you need a reference to the proper location to put log files in the YARN so t /td /tr tr + tdcodespark.yarn.application.tags/code/td + td(none)/td + td + Comma-separated list of strings to pass through as YARN application tags appearing + in YARN ApplicationReports, which can be used for filtering when querying YARN. --- End diff -- when querying YARN apps --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36708965 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType(SPARK) +sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection = +try { + val method = appContext.getClass().getMethod( +setApplicationTags, classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tagCollection)) + logInfo(Applied setApplicationTags based on %s='%s' --- End diff -- I'd omit this log message. I don't think it's more important than other attributes like queue and app name, which we don't log. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36709001 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala --- @@ -163,6 +163,23 @@ private[spark] class Client( appContext.setQueue(args.amQueue) appContext.setAMContainerSpec(containerContext) appContext.setApplicationType(SPARK) +sparkConf.getOption(CONF_SPARK_YARN_APPLICATION_TAGS) + .map(StringUtils.getTrimmedStringCollection(_)) + .filter(!_.isEmpty()) + .foreach { tagCollection = +try { + val method = appContext.getClass().getMethod( +setApplicationTags, classOf[java.util.Set[String]]) + method.invoke(appContext, new java.util.HashSet[String](tagCollection)) + logInfo(Applied setApplicationTags based on %s='%s' +.format(CONF_SPARK_YARN_APPLICATION_TAGS, tagCollection)) + +} catch { + case e: NoSuchMethodException = +logWarning(Ignoring conf %s='%s'; setApplicationTags missing in this version of YARN. --- End diff -- I'd say Ignoring spark.yarn.tags because this version of YARN does not support it --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9782] [YARN] Support YARN application t...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/8072#discussion_r36709027 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala --- @@ -170,6 +173,39 @@ class ClientSuite extends SparkFunSuite with Matchers with BeforeAndAfterAll { cp should contain (/remotePath/my1.jar) } + test(configuration and args propagate through createApplicationSubmissionContext) { --- End diff -- Have you verified that this test passes both against versions of YARN that support app tags and against versions that do not? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7075#issuecomment-127758361 This LGTM pending jenkins --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7075#issuecomment-127758313 jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7874#issuecomment-127438873 jenkins, test this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7874#discussion_r36062508 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -113,6 +113,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli /** * :: DeveloperApi :: * Alternative constructor for setting preferred locations where Spark will create executors. + * Note that preferred locations feature does not work as it is supposed to, see SPARK-8949 --- End diff -- I would be more explicit and say that passing in preferred node location data has no effect at all. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: SPARK-8949 - Print warnings when using preferr...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7874#discussion_r36062537 --- Diff: core/src/main/scala/org/apache/spark/SparkContext.scala --- @@ -153,6 +155,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli preferredNodeLocationData: Map[String, Set[SplitInfo]] = Map()) = { this(SparkContext.updatedConf(new SparkConf(), master, appName, sparkHome, jars, environment)) +if (preferredNodeLocationData.size 0) --- End diff -- Use curly braces around if block --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r36125365 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -53,6 +53,7 @@ private[stat] object KolmogorovSmirnovTest extends Logging { object NullHypothesis extends Enumeration { type NullHypothesis = Value val OneSampleTwoSided = Value(Sample follows theoretical distribution) +val TwoSampleTwoSided = Value(Both samples follow the same distribution) --- End diff -- Nit: I'd make the above one Sample follows *the* theoretical distribution or make the bottom one Both samples follow same distribution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9375] Make sure the total number of exe...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7716#issuecomment-127390297 @andrewor14, the code that @KaiXinXiaoLei suggests fixing seems to be code most recently updated in SPARK-8119. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r36125136 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -254,4 +254,115 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(rCompResult.statistic ~== rKSStat relTol 1e-4) assert(rCompResult.pValue ~== rKSPVal relTol 1e-4) } + --- End diff -- If making the methods package private would simplify the test, I think it's reasonable to do so --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939544 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) +val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) +// traverse the data in partition and calculate distances and counts +val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) = + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) +} +val results = if (pResults == initAcc) { + Array[(Double, Double, Double)]() +} else { + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) +} +results.iterator + } + + /** + * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to + * the two-sample, two-sided Kolmogorov-Smirnov test + * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each + * partition, along with the numerator for the necessary constant adjustments + * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size + * of sample 2)) + * @return The two-sample, two-sided Kolmogorov-Smirnov statistic + */ + private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) +: Double = { +val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator for constant adjustment +// adjust differences based on the number of elements preceding it, which should provide +// the correct distance between the 2 empirical CDFs +val results = localData.foldLeft(initAcc) { case ((prevMax, prevCt), (minCand, maxCand, ct)) = +val adjConst = prevCt / n +val dist1 = math.abs(minCand
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939524 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) +val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) +// traverse the data in partition and calculate distances and counts +val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) = + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) +} +val results = if (pResults == initAcc) { + Array[(Double, Double, Double)]() +} else { + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) +} +results.iterator + } + + /** + * Adjust candidate extremes by the appropriate constant. The resulting maximum corresponds to + * the two-sample, two-sided Kolmogorov-Smirnov test + * @param localData `Array[(Double, Double, Double)]` contains the candidate extremes from each + * partition, along with the numerator for the necessary constant adjustments + * @param n `Double` The denominator in the constant adjustment (i.e. (size of sample 1 ) * (size + * of sample 2)) + * @return The two-sample, two-sided Kolmogorov-Smirnov statistic + */ + private def searchTwoSampleStatistic(localData: Array[(Double, Double, Double)], n: Double) +: Double = { +val initAcc = (Double.MinValue, 0.0) // maximum distance and numerator for constant adjustment --- End diff -- Place this comment on the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939958 --- Diff: mllib/src/test/scala/org/apache/spark/mllib/stat/HypothesisTestSuite.scala --- @@ -254,4 +254,115 @@ class HypothesisTestSuite extends SparkFunSuite with MLlibTestSparkContext { assert(rCompResult.statistic ~== rKSStat relTol 1e-4) assert(rCompResult.pValue ~== rKSPVal relTol 1e-4) } + --- End diff -- Are there weird edge cases here with the partitioning? E.g. where a partition has no elements? Or only elements from one sample? Can we provide tests for them? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939346 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) --- End diff -- Call this `unionedData` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939722 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) +val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) +// traverse the data in partition and calculate distances and counts --- End diff -- nit: in *the* partition --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939695 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) +val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) +// traverse the data in partition and calculate distances and counts +val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) = + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) +} +val results = if (pResults == initAcc) { + Array[(Double, Double, Double)]() +} else { + Array((pResults.min, pResults.max, (pResults.ix1 + 1) * n2 - (pResults.ix2 + 1) * n1)) --- End diff -- Looks like there are a couple spots with two consecutive spaces in here --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939844 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this --- End diff -- Can we describe in a little more detail what the mentioned count is a count of? Or if it's described somewhere else in the code, reference it? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939884 --- Diff: docs/mllib-statistics.md --- @@ -431,11 +431,16 @@ user tests against the normal distribution (`distName=norm`), but does not pro parameters, the test initializes to the standard normal distribution and logs an appropriate message. +There is also a 2-sample, 2-sided implementation available, which tests if the 2 samples are drawn --- End diff -- rather than which tests if, I'd mention the null hypothesis. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939785 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) +val initAcc = KS2Acc(Double.MaxValue, Double.MinValue, 0, 0) +// traverse the data in partition and calculate distances and counts +val pResults = partData.foldLeft(initAcc) { case (acc: KS2Acc, (v, isSample1)) = + val (add1, add2) = if (isSample1) (1, 0) else (0, 1) + val cdf1 = (acc.ix1 + add1) / n1 + val cdf2 = (acc.ix2 + add2) / n2 + val dist = cdf1 - cdf2 + KS2Acc(math.min(acc.min, dist), math.max(acc.max, dist), acc.ix1 + add1, acc.ix2 + add2) +} +val results = if (pResults == initAcc) { --- End diff -- Do things go bad if we don't special case here? Or it's just less efficient? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939573 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { --- End diff -- This can go on the line above --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939624 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a + *count corresponding to the numerator of the adjustment constant coming from this + *partition + */ + private def searchTwoSampleCandidates( + partData: Iterator[(Double, Boolean)], + n1: Double, + n2: Double) +: Iterator[(Double, Double, Double)] = { +// fold accumulator: local minimum, local maximum, index for sample 1, index for sample2 +case class KS2Acc(min: Double, max: Double, ix1: Int, ix2: Int) --- End diff -- KS2Acc is a little cryptic. Maybe `ExtremaPositions`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939648 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later --- End diff -- extra space before comma --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939466 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/KolmogorovSmirnovTest.scala --- @@ -190,5 +191,93 @@ private[stat] object KolmogorovSmirnovTest extends Logging { val pval = 1 - new KolmogorovSmirnovTest().cdf(ksStat, n.toInt) new KolmogorovSmirnovTestResult(pval, ksStat, NullHypothesis.OneSampleTwoSided.toString) } + + /** + * Implements a two-sample, two-sided Kolmogorov-Smirnov test, which tests if the 2 samples + * come from the same distribution + * @param data1 `RDD[Double]` first sample of data + * @param data2 `RDD[Double]` second sample of data + * @return [[org.apache.spark.mllib.stat.test.KolmogorovSmirnovTestResult]] with the test + *statistic, p-value, and appropriate null hypothesis + */ + def testTwoSamples(data1: RDD[Double], data2: RDD[Double]): KolmogorovSmirnovTestResult = { +val n1 = data1.count().toDouble +val n2 = data2.count().toDouble +val isSample1 = true // identifier for sample 1, needed after co-sort +// combine identified samples +val joinedData = data1.map(x = (x, isSample1)) ++ data2.map(x = (x, !isSample1)) +// co-sort and operate on each partition +val localData = joinedData.sortBy { case (v, id) = v }.mapPartitions { part = + searchTwoSampleCandidates(part, n1, n2) // local extrema +}.collect() +val ksStat = searchTwoSampleStatistic(localData, n1 * n2) // result: global extreme +evalTwoSampleP(ksStat, n1.toInt, n2.toInt) + } + + /** + * Calculates maximum distance candidates and counts from each sample within one partition for + * the two-sample, two-sided Kolmogorov-Smirnov test implementation + * @param partData `Iterator[(Double, Boolean)]` the data in 1 partition of the co-sorted RDDs, + *each element is additionally tagged with a boolean flag for sample 1 membership + * @param n1 `Double` sample 1 size + * @param n2 `Double` sample 2 size + * @return `Iterator[(Double, Double, Double)]` where the first element is an unadjusted minimum + *distance , the second is an unadjusted maximum distance (both of which will later + *be adjusted by a constant to account for elements in prior partitions), and a --- End diff -- and the third is --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7075#discussion_r35939406 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -196,4 +196,18 @@ object Statistics { : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testOneSample(data, distName, params: _*) } + + /** + * Perform a two-sample, two-sided Kolmogorov-Smirnov test for probability distribution equality + * The null hypothesis corresponds to both samples coming from the same distribution --- End diff -- corresponds is a little weird. Maybe just The null hypothesis is that both samples come from the same distribution. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8674] [MLlib] Implementation of a 2 sam...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7075#issuecomment-126536916 @mengxr @josepablocam oops thought it was still a WIP for some reason. Just took a pass. It looks mostly done - I just had a bunch of nits and a test request. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9375] Make sure the total number of exe...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7716#issuecomment-125735637 @KaiXinXiaoLei we fixed a couple issues that had this symptom in 1.3. Are you definitely running with a version that's 1.4 or later? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6394#issuecomment-125372128 Great. LGTM as well. Just merged this. Thanks @jerryshao! --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-9388] [yarn] Make executor info log mes...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7706#discussion_r35601139 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala --- @@ -86,10 +86,15 @@ class ExecutorRunnable( val commands = prepareCommand(masterAddress, slaveId, hostname, executorMemory, executorCores, appId, localResources) -logInfo(sSetting up executor with environment: $env) -logInfo(Setting up executor with commands: + commands) -ctx.setCommands(commands) + logInfo(===) +logInfo(Yarn executor launch context:) --- End diff -- Nit: capitalize all of YARN --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6394#issuecomment-125083816 @kayousterhout do the DAGScheduler changes look good to you after @jerryshao 's last pass? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35399233 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient { * This can result in canceling pending requests or filing additional requests. * @return whether the request is acknowledged by the cluster manager. */ - private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + private[spark] def requestTotalExecutors( + numExecutors: Int, --- End diff -- Mind adding the above text to the documentation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35293276 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() + +var numTasksPending = 0 --- End diff -- Ah just looked deeper into the code and it seems like localities are often empty. So nothing to be done here, sorry for the noise. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344727 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35343248 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution --- End diff -- Nit: put comments above, as opposed to after, this class members --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7278#issuecomment-124167898 This LGTM after some cosmetic changes. @mengxr what do you think? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35342576 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/Statistics.scala --- @@ -196,4 +196,33 @@ object Statistics { : KolmogorovSmirnovTestResult = { KolmogorovSmirnovTest.testOneSample(data, distName, params: _*) } + + /** + * Conduct a 1-sample Anderson-Darling test for the null hypothesis that the data + * comes from a given theoretical distribution. The Anderson-Darling test is an alternative + * to the Kolmogorov-Smirnov test, and is more adequate at identifying departures from the + * theoretical distribution at the tails. The implementation returns an + * `AndersonDarlingTestResult`, which includes the statistic, the critical values at varying + * significance levels, and the null hypothesis. Note that the critical values are calculated + * assuming the parameters have been calculated from the data sample. + * If the parameters for the theoretical distribution are not in a valid domain, throws an + * exception. + * @param data the data to be tested + * @param distName name of the theoretical distribution to test against. Currently + *supports Normal (norm), Exponential (exp), Gumbel (gumbel), + *Logistic (logistic), and Weibull (weibull) distributions + * @param params provides the parameters for the theoretical distribution. + * The order of parameters are as follow + * Normal - [mu, sigma] (location, scale) + * Exponential - [1 / lambda] (scale) + * Gumbel - [mu, beta] (location, scale) + * Logistic - [mu, s] (location, scale) + * Weibull - [lambda, k] (scale, shape) + * @return --- End diff -- Give something short for @return --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35343649 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size --- End diff -- I'd call this getCriticalValues for clarity --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344003 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344145 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344395 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344356 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35343600 --- Diff: mllib/src/main/scala/org/apache/spark/mllib/stat/test/AndersonDarlingTest.scala --- @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.stat.test + +import scala.annotation.varargs + +import collection.immutable.ListMap + +import org.apache.commons.math3.distribution.{ExponentialDistribution, GumbelDistribution, + LogisticDistribution, NormalDistribution, WeibullDistribution} + +import org.apache.spark.Logging +import org.apache.spark.rdd.RDD + +/** + * The Anderson-Darling (AD) test, similarly to the Kolmogorov-Smirnov (KS) test, tests whether the + * data follow a given theoretical distribution. It should be used with continuous data and + * assumes that no repeated values occur (the presence of ties can affect the validity of the test). + * The AD test provides an alternative to the KS test. Namely, it is better + * suited to identify departures from the theoretical distribution at the tails. + * It is worth noting that the the AD test's critical values depend on the + * distribution being tested against. The AD statistic is defined as + * {{{ + * A^2 = -N - \frac{1}{N}\sum_{i = 0}^{N} (2i + 1)(\ln{\Phi{(x_i)}} + \ln{(1 - \Phi{(x_{N+1-i})}) + * }}} + * where {{{\Phi}}} is the CDF of the given distribution and `N` is the sample size. + * For more information @see[[https://en.wikipedia.org/wiki/Anderson%E2%80%93Darling_test]] + */ +private[stat] object AndersonDarlingTest extends Logging { + + object NullHypothesis extends Enumeration { +type NullHypothesis = Value +val OneSample = Value(Sample follows theoretical distribution.) + } + + /** + * AndersonDarlingTheoreticalDist is a trait that every distribution used in an AD test must + * extend. The rationale for this is that the AD test has distribution-dependent critical values, + * and by requiring extension of this trait we guarantee that future additional distributions + * make sure to add the appropriate critical values (CVs) (or at least acknowledge + * that they should be added) + */ + sealed trait AndersonDarlingTheoreticalDist extends Serializable { +val params: Seq[Double] // parameters used to initialized the distribution + +def cdf(x: Double): Double // calculate the cdf under the given distribution for value x + +def getCVs(n: Double): Map[Double, Double] // return appropriate CVs, adjusted for sample size + } + + /** + * Critical values and adjustments for distributions sourced from + * [[http://civil.colorado.edu/~balajir/CVEN5454/lectures/Ang-n-Tang-Chap7-Goodness-of-fit-PDFs- + * test.pdf]] + * [[https://github.com/scipy/scipy/blob/v0.15.1/scipy/stats/morestats.py#L1017]], which in turn + * references: + * + * Stephens, M. A. (1974). EDF Statistics for Goodness of Fit and + * Some Comparisons, Journal of the American Statistical Association, + * Vol. 69, pp. 730-737. + * + * Stephens, M. A. (1976). Asymptotic Results for Goodness-of-Fit + * Statistics with Unknown Parameters, Annals of Statistics, Vol. 4, + * pp. 357-369. + * + * Stephens, M. A. (1977). Goodness of Fit for the Extreme Value + * Distribution, Biometrika, Vol. 64, pp. 583-588. + * + * Stephens, M. A. (1977). Goodness of Fit with Special Reference + * to Tests for Exponentiality , Technical Report No. 262, + * Department of Statistics, Stanford University, Stanford, CA. + * + * Stephens, M. A. (1979). Tests of Fit for the Logistic Distribution + * Based on the Empirical Distribution Function, Biometrika, Vol. 66, + * pp. 591-595. + */ + + // Exponential distribution + class AndersonDarlingExponential(val params: Seq[Double]) extends
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7278#discussion_r35344486 --- Diff: docs/mllib-statistics.md --- @@ -456,6 +456,39 @@ val testResult2 = Statistics.kolmogorovSmirnovTest(data, myCDF) /div /div +MLlib also provides a 1-sample Anderson-Darling test, which allows users to test a given +sample of data, in the form of `RDD[Double]`, against one of various continuous distributions. +The statistic can be used to test the null hypothesis that the data come from that given +distribution. Thus the Anderson-Darling test is a goodness-of-fit for continuous --- End diff -- goodness-of-fit *test* --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35291311 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() + +var numTasksPending = 0 --- End diff -- What situation does that occur in? When a stage has some tasks with locality preferences and some tasks without? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35147210 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() + +var numTasksPending = 0 --- End diff -- You can take this out and use `taskLocalityPreferences.size()`, right? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35146872 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationClient.scala --- @@ -28,7 +28,10 @@ private[spark] trait ExecutorAllocationClient { * This can result in canceling pending requests or filing additional requests. * @return whether the request is acknowledged by the cluster manager. */ - private[spark] def requestTotalExecutors(numExecutors: Int): Boolean + private[spark] def requestTotalExecutors( + numExecutors: Int, --- End diff -- To be as clear as possible, can you update the documentation for this method to the following. Also, change `localityAwarePendingTasks` to `localityAwareTasks` because the tasks need not be pending. --- Update the cluster manager on our scheduling needs. Three bits of information are included to help it make decisions. @param numExecutors The total number of executors we'd like to have. The cluster manager shouldn't kill any running executor to reach this number, but, if all existing executors were to die, this is the number of executors we'd want to be allocated. @param localityAwareTasks The number of tasks in all active stages that have a locality preferences. This includes running, pending, and completed tasks. @param hostToLocalTaskCount A map of hosts to the number of tasks from all active stages that would like to like to run on that host. This includes running, pending, and completed tasks. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35145466 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35147681 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() + +var numTasksPending = 0 --- End diff -- Is it possible for the localities to ever be empty? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35147827 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -526,6 +537,19 @@ private[spark] class ExecutorAllocationManager( allocationManager.synchronized { stageIdToNumTasks(stageId) = numTasks allocationManager.onSchedulerBacklogged() + --- End diff -- Add a short comment here: // Compute the number of tasks requested by the stage on each host --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35148779 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get a tuple of (the number of task with locality preferences, a map where each pair is a + * node and the number of tasks that would like to be scheduled on that node). + */ --- End diff -- Mention These hints are updated when stages arrive and complete, so are not up-to-date at task granularity within stages. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35149276 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { --- End diff -- Consolidate this with LocalityPreferredContainerPlacementStrategy because there are no other implementations. I agree that it's often nice to have interfaces separate from their implementations, even if there's only a single implementation, but I haven't seen this generally practiced in Spark. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151031 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -225,8 +245,11 @@ private[yarn] class YarnAllocator( logInfo(sWill request $missing executor containers, each with ${resource.getVirtualCores} + scores and ${resource.getMemory} MB memory including $memoryOverhead MB overhead) - for (i - 0 until missing) { -val request = createContainerRequest(resource) + val containerLocalityPreferences = containerPlacementStrategy.localityOfRequestedContainers( --- End diff -- It's possible that, since the last time we made container requests, stages have completed and been submitted, and that the localities at which we requested our pending executors no longer apply to our current needs. Can we remove all outstanding container requests and add requests anew each time to avoid this? Ideally we would run a benchmark to see how computationally expensive this is. At the very least, can we add a TODO here explaining the problem and file a followup JIRA to address this? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151608 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. --- End diff -- What exact is meant by matching localities? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35148929 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala --- @@ -127,6 +127,16 @@ private[yarn] class YarnAllocator( } } + // A map to store preferred hostname and possible task numbers running on it. + private var hostToLocalTaskCounts: Map[String, Int] = Map.empty + + // Locality required pending task number --- End diff -- Number of tasks that have locality preferences in active stages --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35149314 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151721 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 --- End diff -- If containers are existed - If containers exist --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/6394#issuecomment-123474754 I made a final pass on this for clarity issues. I'm being nitpicky on some of the stuff, but I think it's important to be as clear as possible given that what's going on here is inherently pretty complicated. @kayousterhout are you satisfied with the DAGScheduler changes? If so, I think that after this batch of comments are addressed, the patch should be good to go. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35148005 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get a tuple of (the number of task with locality preferences, a map where each pair is a + * node and the number of tasks that would like to be scheduled on that node). + */ +def executorPlacementHints(): (Int, Map[String, Int]) = + allocationManager.synchronized { +var localityAwarePendingTasks = 0 --- End diff -- IIUC, these tasks might be running or completed, so replace with `localityAwareTasks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151169 --- Diff: yarn/src/test/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategySuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import org.scalatest.{BeforeAndAfterEach, Matchers} + +import org.apache.spark.SparkFunSuite + +class ContainerPlacementStrategySuite extends SparkFunSuite with Matchers with BeforeAndAfterEach { + + private val yarnAllocatorSuite = new YarnAllocatorSuite --- End diff -- Ah ok my bad --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151299 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that --- End diff -- `existed` - `existing` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35151397 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which --- End diff -- Replace this with Consider a situation in which we have 20 tasks... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35152686 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35152962 --- Diff: yarn/src/main/scala/org/apache/spark/deploy/yarn/ContainerPlacementStrategy.scala --- @@ -0,0 +1,196 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the License); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an AS IS BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.yarn.util.RackResolver + +import org.apache.spark.SparkConf + +private[yarn] case class ContainerLocalityPreferences(nodes: Array[String], racks: Array[String]) + +private[yarn] trait ContainerPlacementStrategy { + + /** + * Calculate each container's node locality and rack locality + * @param numContainer number of containers to calculate + * @param numLocalityAwarePendingTasks number of locality required pending tasks + * @param hostToLocalTaskCount a map to store the preferred hostname and possible task + * numbers running on it, used as hints for container allocation + * @return node localities and rack localities, each locality is an array of string, + * the length of localities is the same as number of containers + */ + def localityOfRequestedContainers( + numContainer: Int, + numLocalityAwarePendingTasks: Int, + hostToLocalTaskCount: Map[String, Int] +): Array[ContainerLocalityPreferences] +} + +/** + * This strategy is calculating the optimal locality preferences of YARN containers by considering + * the node ratio of pending tasks, number of required cores/containers and and locality of current + * existed containers. The target of this algorithm is to maximize the number of tasks that + * would run locally. + * + * The details of this algorithm is described as below, if we have 20 tasks which + * require (host1, host2, host3) and 10 tasks which require (host1, host2, host4), + * besides each container has 2 cores and cpus per task is 1, so the required container number is + * 15, and host ratio is (host1: 30, host2: 30, host3: 20, host4: 10). + * + * 1. If requested container number (18) is more than the required container number (15): + * + * requests for 5 containers with nodes: (host1, host2, host3, host4) + * requests for 5 containers with nodes: (host1, host2, host3) + * requests for 5 containers with nodes: (host1, host2) + * requests for 3 containers with no locality preferences. + * + * The placement ratio is 3 : 3 : 2 : 1, and set the additional containers with no locality + * preferences. + * + * 2. If requested container number (10) is less than or equal to the required container number + * (15): + * + * requests for 4 containers with nodes: (host1, host2, host3, host4) + * requests for 3 containers with nodes: (host1, host2, host3) + * requests for 3 containers with nodes: (host1, host2) + * + * The placement ratio is 10 : 10 : 7 : 4, close to expected ratio (3 : 3 : 2 : 1) + * + * 3. If containers are existed but no matching localities, follow the method of 1 and 2. + * + * 4. If containers are existed and some localities are matched. For example if we have 1 + * containers on each node (host1: 1, host2: 1: host3: 1, host4: 1), and the expected containers + * on each node would be (host1: 5, host2: 5, host3: 4, host4: 2), + * so the newly requested containers on each node would be updated to (host1: 4, host2: 4, + * host3: 3, host4: 1), 12 containers by total. + * + * 4.1 If requested container number (18) is more than newly required containers (12). Follow + * method 1 with updated ratio 4 : 4 : 3 : 1. + * + * 4.2 If request container number (10) is more than newly required containers (12). Follow + * method 2 with updated ratio 4 : 4 : 3 : 1. + * + * 5. If containers are existed and existing localities can fully cover
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35148070 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -637,6 +662,24 @@ private[spark] class ExecutorAllocationManager( def isExecutorIdle(executorId: String): Boolean = { !executorIdToTaskIds.contains(executorId) } + +/** + * Get a tuple of (the number of task with locality preferences, a map where each pair is a --- End diff -- `task` - `tasks` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-4352][YARN][WIP] Incorporate locality p...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/6394#discussion_r35148227 --- Diff: core/src/main/scala/org/apache/spark/ExecutorAllocationManager.scala --- @@ -295,7 +295,9 @@ private[spark] class ExecutorAllocationManager( // If the new target has not changed, avoid sending a message to the cluster manager if (numExecutorsTarget oldNumExecutorsTarget) { -client.requestTotalExecutors(numExecutorsTarget) +val (localityAwarePendingTasks, preferredLocalities) = listener.executorPlacementHints() --- End diff -- Can we cache the result of `executorPlacementHints` and only update it when stages come or go? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8884] [MLlib] 1-sample Anderson-Darling...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7278#issuecomment-123123557 jenkins, retest this please --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8996] [MLlib] [PySpark] Python API for ...
Github user sryza commented on the pull request: https://github.com/apache/spark/pull/7430#issuecomment-122346380 LGTM modulo a small comment --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request: [SPARK-8996] [MLlib] [PySpark] Python API for ...
Github user sryza commented on a diff in the pull request: https://github.com/apache/spark/pull/7430#discussion_r34910046 --- Diff: python/pyspark/mllib/stat/_statistics.py --- @@ -238,6 +242,60 @@ def chiSqTest(observed, expected=None): jmodel = callMLlibFunc(chiSqTest, _convert_to_vector(observed), expected) return ChiSqTestResult(jmodel) +@staticmethod +@ignore_unicode_prefix +def kolmogorovSmirnovTest(data, distName=norm, *params): + +.. note:: Experimental + +Performs the Kolmogorov Smirnov (KS) test for data sampled from +a continuous distribution. It tests the null hypothesis that +the data is generated from a particular distribution. + +The given data is sorted and the Empirical Cumulative +Distribution Function (ECDF) is calculated +which for a given point is the number of points having a CDF +value lesser than it divided by the total number of points. + +Since the data is sorted, this is a step function +that rises by (1 / length of data) for every ordered point. + +The KS statistic gives us the maximum distance between the +ECDF and the CDF. Intuitively if this statistic is large, the +probabilty that the null hypothesis is true becomes small. +For specific details of the implementation, please have a look +at the Scala documentation. + +:param data: RDD, samples from the data +:param distName: string, currently only norm is supported. + (Normal distribution) to calculate the + theoretical distribution of the data. +:param params: additional values which need to be provided for + a certain distribution. + If not provided, the default values are used. +:return: KolmogorovSmirnovTestResult object containing the test + statistic, degrees of freedom, p-value, + the method used, and the null hypothesis. + + kstest = Statistics.kolmogorovSmirnovTest --- End diff -- Small thing: can we include an example that passes parameters in? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org