Github user manishamde commented on a diff in the pull request:

    https://github.com/apache/spark/pull/3000#discussion_r19636313
  
    --- Diff: 
mllib/src/main/scala/org/apache/spark/mllib/feature/DatasetIndexer.scala ---
    @@ -0,0 +1,280 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.feature
    +
    +import org.apache.spark.Logging
    +import org.apache.spark.annotation.Experimental
    +import org.apache.spark.mllib.linalg.{Vectors, DenseVector, SparseVector, 
Vector}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.collection.OpenHashSet
    +
    +/**
    + * :: Experimental ::
    + * Class for indexing columns in a dataset.
    + *
    + * This helps process a dataset of unknown vectors into a dataset with 
some continuous features
    + * and some categorical features. The choice between continuous and 
categorical is based upon
    + * a maxCategories parameter.
    + *
    + * This can also map categorical feature values to 0-based indices.
    + *
    + * Usage:
    + *   val myData1: RDD[Vector] = ...
    + *   val myData2: RDD[Vector] = ...
    + *   val datasetIndexer = new DatasetIndexer(maxCategories)
    + *   datasetIndexer.fit(myData1)
    + *   val indexedData1: RDD[Vector] = datasetIndexer.transform(myData1)
    + *   datasetIndexer.fit(myData2)
    + *   val indexedData2: RDD[Vector] = datasetIndexer.transform(myData2)
    + *   val categoricalFeaturesInfo: Map[Int, Int] = 
datasetIndexer.getCategoricalFeaturesInfo()
    + *
    + * TODO: Add option for transform: defaultForUnknownValue (default index 
for unknown category).
    + *
    + * TODO: Add warning if a categorical feature has only 1 category.
    + */
    +@Experimental
    +class DatasetIndexer(
    +    val maxCategories: Int,
    +    val ignoreUnrecognizedCategories: Boolean = true)
    +  extends Logging with Serializable {
    +
    +  require(maxCategories > 1,
    +    s"DatasetIndexer given maxCategories = $maxCategories, but requires 
maxCategories > 1.")
    +
    +  private class FeatureValueStats(val numFeatures: Int, val maxCategories: 
Int)
    +    extends Serializable {
    +
    +    val featureValueSets = 
Array.fill[OpenHashSet[Double]](numFeatures)(new OpenHashSet[Double]())
    +
    +    /**
    +     * Merge other [[FeatureValueStats]] into this instance, modifying 
this instance.
    +     * @param other  Other instance.  Not modified.
    +     * @return This instance
    +     */
    +    def merge(other: FeatureValueStats): FeatureValueStats = {
    +      featureValueSets.zip(other.featureValueSets).foreach { case (fvs1, 
fvs2) =>
    +        fvs2.iterator.foreach { val2 =>
    +          if (fvs1.size <= maxCategories) fvs1.add(val2)
    +        }
    +      }
    +      this
    +    }
    +
    +    def addDenseVector(dv: DenseVector): Unit = {
    +      var i = 0
    +      while (i < dv.size) {
    +        if (featureValueSets(i).size <= maxCategories) {
    +          featureValueSets(i).add(dv(i))
    +        }
    +        i += 1
    +      }
    +    }
    +
    +    def addSparseVector(sv: SparseVector): Unit = {
    +      // TODO: This could be made more efficient.
    +      var vecIndex = 0 // index into vector
    +      var nzIndex = 0 // index into non-zero elements
    +      while (vecIndex < sv.size) {
    +        val featureValue = if (nzIndex < sv.indices.size && vecIndex == 
sv.indices(nzIndex)) {
    +          nzIndex += 1
    +          sv.values(nzIndex - 1)
    +        } else {
    +          0.0
    +        }
    +        if (featureValueSets(vecIndex).size <= maxCategories) {
    +          featureValueSets(vecIndex).add(featureValue)
    +        }
    +        vecIndex += 1
    +      }
    +    }
    +
    +  }
    +
    +  /**
    +   * Array (over features) of sets of distinct feature values (up to 
maxCategories values).
    +   * Null values in array indicate feature has been determined to be 
continuous.
    +   *
    +   * Once the number of elements in a feature's set reaches maxCategories 
+ 1,
    +   * then it is declared continuous, and we stop adding elements.
    +   */
    +  private var featureValueStats: Option[FeatureValueStats] = None
    +
    +  /**
    +   * Scans a dataset once and updates statistics about each column.
    +   * The statistics are used to choose categorical features and re-index 
them.
    +   *
    +   * Warning: Calling this on a new dataset changes the feature statistics 
and thus
    +   *          can change the behavior of [[transform]] and 
[[getCategoricalFeatureIndexes]].
    +   *          It is best to [[fit]] on all datasets before calling 
[[transform]] on any.
    +   *
    +   * @param data  Dataset with equal-length vectors.
    +   *              NOTE: A single instance of [[DatasetIndexer]] must 
always be given vectors of
    +   *              the same length.  If given non-matching vectors, this 
method will throw an error.
    --- End diff --
    
    Minor: extra space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to