Github user manishamde commented on a diff in the pull request:
https://github.com/apache/spark/pull/3000#discussion_r19645862
--- Diff:
mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala ---
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.mllib.feature
+
+import org.apache.spark.Logging
+import org.apache.spark.annotation.Experimental
+import org.apache.spark.mllib.linalg.{Vectors, DenseVector, SparseVector,
Vector}
+import org.apache.spark.rdd.RDD
+import org.apache.spark.util.collection.OpenHashSet
+
+/**
+ * :: Experimental ::
+ * Class for indexing columns in a dataset.
+ *
+ * This helps process a dataset of unknown vectors into a dataset with
some continuous features
+ * and some categorical features. The choice between continuous and
categorical is based upon
+ * a maxCategories parameter.
+ *
+ * This can also map categorical feature values to 0-based indices.
+ *
+ * Usage:
+ * val myData1: RDD[Vector] = ...
+ * val myData2: RDD[Vector] = ...
+ * val datasetIndexer = new DatasetIndexer(maxCategories)
+ * datasetIndexer.fit(myData1)
+ * val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1)
+ * datasetIndexer.fit(myData2)
+ * val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2)
+ * val categoricalFeaturesInfo: Map[Int, Int] =
datasetIndexer.getCategoricalFeaturesInfo()
+ *
+ * TODO: Add option for transform: defaultForUnknownValue (default index
for unknown category).
+ *
+ * TODO: Add warning if a categorical feature has only 1 category.
+ */
+@Experimental
+class DatasetIndexer(
+ val maxCategories: Int,
+ val ignoreUnrecognizedCategories: Boolean = true)
+ extends Logging with Serializable {
+
+ require(maxCategories > 1,
+ s"DatasetIndexer given maxCategories = $maxCategories, but requires
maxCategories > 1.")
+
+ private class FeatureValueStats(val numFeatures: Int, val maxCategories:
Int)
+ extends Serializable {
+
+ val featureValueSets =
Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]())
+
+ /**
+ * Merge other [[FeatureValueStats]] into this instance, modifying
this instance.
+ * @param other Other instance. Not modified.
+ * @return This instance
+ */
+ def merge(other: FeatureValueStats): FeatureValueStats = {
+ featureValueSets.zip(other.featureValueSets).foreach { case (fvs1,
fvs2) =>
+ fvs2.iterator.foreach { val2 =>
+ if (fvs1.size <= maxCategories) fvs1.add(val2)
+ }
+ }
+ this
+ }
+
+ def addDenseVector(dv: DenseVector): Unit = {
+ var i = 0
+ while (i < dv.size) {
+ if (featureValueSets(i).size <= maxCategories) {
+ featureValueSets(i).add(dv(i))
+ }
+ i += 1
+ }
+ }
+
+ def addSparseVector(sv: SparseVector): Unit = {
+ // TODO: This could be made more efficient.
+ var vecIndex = 0 // index into vector
+ var nzIndex = 0 // index into non-zero elements
+ while (vecIndex < sv.size) {
+ val featureValue = if (nzIndex < sv.indices.size && vecIndex ==
sv.indices(nzIndex)) {
+ nzIndex += 1
+ sv.values(nzIndex - 1)
+ } else {
+ 0.0
+ }
+ if (featureValueSets(vecIndex).size <= maxCategories) {
+ featureValueSets(vecIndex).add(featureValue)
+ }
+ vecIndex += 1
+ }
+ }
+
+ }
+
+ /**
+ * Array (over features) of sets of distinct feature values (up to
maxCategories values).
+ * Null values in array indicate feature has been determined to be
continuous.
+ *
+ * Once the number of elements in a feature's set reaches maxCategories
+ 1,
+ * then it is declared continuous, and we stop adding elements.
+ */
+ private var featureValueStats: Option[FeatureValueStats] = None
+
+ /**
+ * Scans a dataset once and updates statistics about each column.
+ * The statistics are used to choose categorical features and re-index
them.
+ *
+ * Warning: Calling this on a new dataset changes the feature statistics
and thus
+ * can change the behavior of [[transform]] and
[[getCategoricalFeatureIndexes]].
+ * It is best to [[fit]] on all datasets before calling
[[transform]] on any.
+ *
+ * @param data Dataset with equal-length vectors.
+ * NOTE: A single instance of [[DatasetIndexer]] must
always be given vectors of
+ * the same length. If given non-matching vectors, this
method will throw an error.
+ */
+ def fit(data: RDD[Vector]): Unit = {
+ // For each partition, get (featureValueStats, newNumFeatures).
+ // If all vectors have the same length, then newNumFeatures = -1.
+ // If a vector with a new length is found, then newNumFeatures is set
to that length.
+ val partitionFeatureValueSets: RDD[(Option[FeatureValueStats], Int)] =
+ data.mapPartitions { iter =>
+ // Make local copy of featureValueStats.
+ // This will be None initially if this is the first dataset to be
fitted.
+ var localFeatureValueStats: Option[FeatureValueStats] =
featureValueStats
+ var localNumFeatures: Int = -1
--- End diff --
-1 is used in several places to signify uninitialized value. A private val
might be better even it's value is -1.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]