Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1878#discussion_r16338266
--- Diff:
examples/src/main/scala/org/apache/spark/examples/mllib/SampledRDDs.scala ---
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.examples.mllib
+
+import org.apache.spark.mllib.util.MLUtils
+import scopt.OptionParser
+
+import org.apache.spark.{SparkConf, SparkContext}
+import org.apache.spark.SparkContext._
+
+/**
+ * An example app for randomly generated and sampled RDDs. Run with
+ * {{{
+ * bin/run-example org.apache.spark.examples.mllib.SampledRDDs
+ * }}}
+ * If you use it as a template to create your own app, please use
`spark-submit` to submit your app.
+ */
+object SampledRDDs {
+
+ case class Params(input: String =
"data/mllib/sample_binary_classification_data.txt")
+
+ def main(args: Array[String]) {
+ val defaultParams = Params()
+
+ val parser = new OptionParser[Params]("SampledRDDs") {
+ head("SampledRDDs: an example app for randomly generated and sampled
RDDs.")
+ opt[String]("input")
+ .text(s"Input path to labeled examples in LIBSVM format, default:
${defaultParams.input}")
+ .action((x, c) => c.copy(input = x))
+ note(
+ """
+ |For example, the following command runs this app:
+ |
+ | bin/spark-submit --class
org.apache.spark.examples.mllib.SampledRDDs \
+ | examples/target/scala-*/spark-examples-*.jar
+ """.stripMargin)
+ }
+
+ parser.parse(args, defaultParams).map { params =>
+ run(params)
+ } getOrElse {
+ sys.exit(1)
+ }
+ }
+
+ def run(params: Params) {
+ val conf = new SparkConf().setAppName(s"SampledRDDs with $params")
+ val sc = new SparkContext(conf)
+
+ val fraction = 0.1 // fraction of data to sample
+
+ val examples = MLUtils.loadLibSVMFile(sc, params.input)
+ val numExamples = examples.count()
+ println(s"Loaded data with $numExamples examples from file:
${params.input}")
+
+ // Example: RDD.sample() and RDD.takeSample()
+ val expectedSampleSize = (numExamples * fraction).toInt
+ println(s"Sampling RDD using fraction $fraction. Expected sample size
= $expectedSampleSize.")
+ val sampledRDD = examples.sample(withReplacement = true, fraction =
fraction)
+ println(s" RDD.sample(): sample has ${sampledRDD.count()} examples")
+ val sampledArray = examples.takeSample(withReplacement = true, num =
expectedSampleSize)
+ println(s" RDD.takeSample(): sample has ${sampledArray.size}
examples")
+
+ println()
+
+ // Example: RDD.sampleByKey() and RDD.sampleByKeyExact()
+ val keyedRDD = examples.map { lp => (lp.label.toInt, lp.features) }
+ println(s" Keyed data using label (Int) as key ==> Orig")
+ // Count examples per label in original data.
+ val keyCounts = keyedRDD.countByKey()
+
+ // Subsample, and count examples per label in sampled data.
(approximate)
+ val fractions = keyCounts.keys.map((_, fraction)).toMap
+ val sampledByKeyRDD = keyedRDD.sampleByKey(withReplacement = true,
fractions = fractions)
+ val keyCountsB = sampledByKeyRDD.countByKey()
+ val sizeB = keyCountsB.values.sum
+ println(s" Sampled $sizeB examples using approximate stratified
sampling (by label)." +
+ " ==> Approx Sample")
+
+ // Subsample, and count examples per label in sampled data.
(approximate)
+ val sampledByKeyRDDExact =
+ keyedRDD.sampleByKeyExact(withReplacement = true, fractions =
fractions)
+ val keyCountsBExact = sampledByKeyRDDExact.countByKey()
+ val sizeBExact = keyCountsBExact.values.sum
+ println(s" Sampled $sizeBExact examples using exact stratified
sampling (by label)." +
+ " ==> Exact Sample")
+
+ // Compare samples
+ println(s" \tFractions of examples with key")
+ println(s"Key\tOrig\tApprox Sample\tExact Sample")
+ keyCounts.keys.toSeq.sorted.foreach { key =>
+ val origFrac = keyCounts(key) / numExamples.toDouble
+ val approxFrac = keyCountsB(key) / sizeB.toDouble
--- End diff --
There is a chance that `keyCountsB` doesn't contains `key`. It is safer to
use `keyCountsB.getOrElse` here.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]