Github user davies commented on a diff in the pull request:

    https://github.com/apache/spark/pull/2819#discussion_r19455072
  
    --- Diff: python/pyspark/mllib/feature.py ---
    @@ -18,59 +18,348 @@
     """
     Python package for feature in MLlib.
     """
    +import sys
    +import warnings
    +
    +import py4j.protocol
    +from py4j.protocol import Py4JJavaError
    +from py4j.java_gateway import JavaObject
    +
    +from pyspark import RDD, SparkContext
     from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
    -from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
    +from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
    +
    +__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
    +           'HashTF', 'IDFModel', 'IDF',
    +           'Word2Vec', 'Word2VecModel']
    +
    +
    +# Hack for support float('inf') in Py4j
    +old_smart_decode = py4j.protocol.smart_decode
    +
    +float_str_mapping = {
    +    u'nan': u'NaN',
    +    u'inf': u'Infinity',
    +    u'-inf': u'-Infinity',
    +}
    +
    +
    +def new_smart_decode(obj):
    +    if isinstance(obj, float):
    +        s = unicode(obj)
    +        return float_str_mapping.get(s, s)
    +    return old_smart_decode(obj)
    +
    +py4j.protocol.smart_decode = new_smart_decode
    +
    +
    +# TODO: move these helper functions into utils
    +_picklable_classes = [
    +    'LinkedList',
    +    'SparseVector',
    +    'DenseVector',
    +    'DenseMatrix',
    +    'Rating',
    +    'LabeledPoint',
    +]
    +
    +
    +def _py2java(sc, a):
    +    """ Convert Python object into Java """
    +    if isinstance(a, RDD):
    +        a = _to_java_object_rdd(a)
    +    elif not isinstance(a, (int, long, float, bool, basestring)):
    +        bytes = bytearray(PickleSerializer().dumps(a))
    +        a = sc._jvm.SerDe.loads(bytes)
    +    return a
    +
    +
    +def _java2py(sc, r):
    +    if isinstance(r, JavaObject):
    +        clsName = r.getClass().getSimpleName()
    +        if clsName in ("RDD", "JavaRDD"):
    +            if clsName == "RDD":
    +                r = r.toJavaRDD()
    +            jrdd = sc._jvm.SerDe.javaToPython(r)
    +            return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
     
    -__all__ = ['Word2Vec', 'Word2VecModel']
    +        elif clsName in _picklable_classes:
    +            r = sc._jvm.SerDe.dumps(r)
     
    +    if isinstance(r, bytearray):
    +        r = PickleSerializer().loads(str(r))
    +    return r
     
    -class Word2VecModel(object):
    +
    +def _callJavaFunc(sc, func, *args):
    +    """ Call Java Function
         """
    -    class for Word2Vec model
    +    args = [_py2java(sc, a) for a in args]
    +    return _java2py(sc, func(*args))
    +
    +
    +def _callAPI(sc, name, *args):
    +    """ Call API in PythonMLLibAPI
         """
    -    def __init__(self, sc, java_model):
    +    api = getattr(sc._jvm.PythonMLLibAPI(), name)
    +    return _callJavaFunc(sc, api, *args)
    +
    +
    +class VectorTransformer(object):
    +    """
    +    :: DeveloperApi ::
    +    Base class for transformation of a vector or RDD of vector
    +    """
    +    def transform(self, vector):
             """
    -        :param sc:  Spark context
    -        :param java_model:  Handle to Java model object
    +        Applies transformation on a vector.
    +
    +        :param vector: vector to be transformed.
             """
    +        raise NotImplementedError
    +
    +
    +class Normalizer(VectorTransformer):
    +    """
    +    :: Experimental ::
    +    Normalizes samples individually to unit L^p^ norm
    +
    +    For any 1 <= p <= float('inf'), normalizes samples using
    +    sum(abs(vector).^p^)^(1/p)^ as norm.
    +
    +    For p = float('inf'), max(abs(vector)) will be used as norm for 
normalization.
    +
    +    >>> v = Vectors.dense(range(3))
    +    >>> nor = Normalizer(1)
    +    >>> nor.transform(v)
    +    DenseVector([0.0, 0.3333, 0.6667])
    +
    +    >>> rdd = sc.parallelize([v])
    +    >>> nor.transform(rdd).collect()
    +    [DenseVector([0.0, 0.3333, 0.6667])]
    +
    +    >>> nor2 = Normalizer(float("inf"))
    +    >>> nor2.transform(v)
    +    DenseVector([0.0, 0.5, 1.0])
    +    """
    +    def __init__(self, p=2):
    +        """
    +        :param p: Normalization in L^p^ space, p = 2 by default.
    +        """
    +        assert p >= 1.0, "p should be greater than 1.0"
    +        self.p = float(p)
    +
    +    def transform(self, vector):
    +        """
    +        Applies unit length normalization on a vector.
    +
    +        :param vector: vector to be normalized.
    +        :return: normalized vector. If the norm of the input is zero, it
    +                will return the input vector.
    +        """
    +        sc = SparkContext._active_spark_context
    +        assert sc is not None, "SparkContext should be initialized first"
    +        return _callAPI(sc, "normalizeVector", self.p, vector)
    +
    +
    +class JavaModelWrapper(VectorTransformer):
    +    """
    +    Wrapper for the model in JVM
    +    """
    +    def __init__(self, sc, java_model):
             self._sc = sc
             self._java_model = java_model
     
         def __del__(self):
             self._sc._gateway.detach(self._java_model)
     
    -    def transform(self, word):
    +    def transform(self, dataset):
    +        return _callJavaFunc(self._sc, self._java_model.transform, dataset)
    +
    +
    +class StandardScalerModel(JavaModelWrapper):
    +    """
    +    :: Experimental ::
    +    Represents a StandardScaler model that can transform vectors.
    +    """
    +    def transform(self, vector):
             """
    -        :param word: a word
    -        :return: vector representation of word
    +        Applies standardization transformation on a vector.
    +
    +        :param vector: Vector to be standardized.
    +        :return: Standardized vector. If the variance of a column is zero,
    +                it will return default `0.0` for the column with zero 
variance.
    +        """
    +        return JavaModelWrapper.transform(self, vector)
    +
     
    +class StandardScaler(object):
    +    """
    +    :: Experimental ::
    +    Standardizes features by removing the mean and scaling to unit
    +    variance using column summary statistics on the samples in the
    +    training set.
    +
    +    >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0, 
1.9])]
    +    >>> dataset = sc.parallelize(vs)
    +    >>> standardizer = StandardScaler(True, True)
    +    >>> model = standardizer.fit(dataset)
    +    >>> result = model.transform(dataset)
    +    >>> for r in result.collect(): r
    +    DenseVector([-0.7071, 0.7071, -0.7071])
    +    DenseVector([0.7071, -0.7071, 0.7071])
    +    """
    +    def __init__(self, withMean=False, withStd=True):
    +        """
    +        :param withMean: False by default. Centers the data with mean
    +                 before scaling. It will build a dense output, so this
    +                 does not work on sparse input and will raise an exception.
    +        :param withStd: True by default. Scales the data to unit standard
    +                 deviation.
    +        """
    +        if not (withMean or withStd):
    +            warnings.warn("Both withMean and withStd are false. The model 
does nothing.")
    +        self.withMean = withMean
    +        self.withStd = withStd
    +
    +    def fit(self, dataset):
    +        """
    +        Computes the mean and variance and stores as a model to be used 
for later scaling.
    +
    +        :param data: The data used to compute the mean and variance to 
build
    +                    the transformation model.
    +        :return: a StandardScalarModel
    +        """
    +        sc = dataset.context
    +        jmodel = _callAPI(sc, "fitStandardScaler", self.withMean, 
self.withStd, dataset)
    +        return StandardScalerModel(sc, jmodel)
    +
    +
    +class HashingTF(object):
    +    """
    +    :: Experimental ::
    +    Maps a sequence of terms to their term frequencies using the hashing 
trick.
    +
    +    >>> htf = HashingTF(100)
    +    >>> doc = "a a b b c d".split(" ")
    +    >>> htf.transform(doc)
    +    SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
    +    """
    +    def __init__(self, numFeatures=1 << 20):
    +        """
    +        :param numFeatures: number of features (default: 2^20)
    +        """
    +        self.numFeatures = numFeatures
    +
    +    def indexOf(self, term):
    +        """ Returns the index of the input term. """
    +        return hash(term) % self.numFeatures
    --- End diff --
    
    term can be any type, it's very hard to have same hash with Scala.
    
    PS: In python mutable object (such as dict, set, list) is not hashable, 
should we support these types for term?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to