Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/2819#discussion_r19454191
--- Diff: python/pyspark/mllib/feature.py ---
@@ -18,59 +18,348 @@
"""
Python package for feature in MLlib.
"""
+import sys
+import warnings
+
+import py4j.protocol
+from py4j.protocol import Py4JJavaError
+from py4j.java_gateway import JavaObject
+
+from pyspark import RDD, SparkContext
from pyspark.serializers import PickleSerializer, AutoBatchedSerializer
-from pyspark.mllib.linalg import _convert_to_vector, _to_java_object_rdd
+from pyspark.mllib.linalg import Vectors, _to_java_object_rdd
+
+__all__ = ['Normalizer', 'StandardScalerModel', 'StandardScaler',
+ 'HashTF', 'IDFModel', 'IDF',
+ 'Word2Vec', 'Word2VecModel']
+
+
+# Hack for support float('inf') in Py4j
+old_smart_decode = py4j.protocol.smart_decode
+
+float_str_mapping = {
+ u'nan': u'NaN',
+ u'inf': u'Infinity',
+ u'-inf': u'-Infinity',
+}
+
+
+def new_smart_decode(obj):
+ if isinstance(obj, float):
+ s = unicode(obj)
+ return float_str_mapping.get(s, s)
+ return old_smart_decode(obj)
+
+py4j.protocol.smart_decode = new_smart_decode
+
+
+# TODO: move these helper functions into utils
+_picklable_classes = [
+ 'LinkedList',
+ 'SparseVector',
+ 'DenseVector',
+ 'DenseMatrix',
+ 'Rating',
+ 'LabeledPoint',
+]
+
+
+def _py2java(sc, a):
+ """ Convert Python object into Java """
+ if isinstance(a, RDD):
+ a = _to_java_object_rdd(a)
+ elif not isinstance(a, (int, long, float, bool, basestring)):
+ bytes = bytearray(PickleSerializer().dumps(a))
+ a = sc._jvm.SerDe.loads(bytes)
+ return a
+
+
+def _java2py(sc, r):
+ if isinstance(r, JavaObject):
+ clsName = r.getClass().getSimpleName()
+ if clsName in ("RDD", "JavaRDD"):
+ if clsName == "RDD":
+ r = r.toJavaRDD()
+ jrdd = sc._jvm.SerDe.javaToPython(r)
+ return RDD(jrdd, sc, AutoBatchedSerializer(PickleSerializer()))
-__all__ = ['Word2Vec', 'Word2VecModel']
+ elif clsName in _picklable_classes:
+ r = sc._jvm.SerDe.dumps(r)
+ if isinstance(r, bytearray):
+ r = PickleSerializer().loads(str(r))
+ return r
-class Word2VecModel(object):
+
+def _callJavaFunc(sc, func, *args):
+ """ Call Java Function
"""
- class for Word2Vec model
+ args = [_py2java(sc, a) for a in args]
+ return _java2py(sc, func(*args))
+
+
+def _callAPI(sc, name, *args):
+ """ Call API in PythonMLLibAPI
"""
- def __init__(self, sc, java_model):
+ api = getattr(sc._jvm.PythonMLLibAPI(), name)
+ return _callJavaFunc(sc, api, *args)
+
+
+class VectorTransformer(object):
+ """
+ :: DeveloperApi ::
+ Base class for transformation of a vector or RDD of vector
+ """
+ def transform(self, vector):
"""
- :param sc: Spark context
- :param java_model: Handle to Java model object
+ Applies transformation on a vector.
+
+ :param vector: vector to be transformed.
"""
+ raise NotImplementedError
+
+
+class Normalizer(VectorTransformer):
+ """
+ :: Experimental ::
+ Normalizes samples individually to unit L^p^ norm
+
+ For any 1 <= p <= float('inf'), normalizes samples using
+ sum(abs(vector).^p^)^(1/p)^ as norm.
+
+ For p = float('inf'), max(abs(vector)) will be used as norm for
normalization.
+
+ >>> v = Vectors.dense(range(3))
+ >>> nor = Normalizer(1)
+ >>> nor.transform(v)
+ DenseVector([0.0, 0.3333, 0.6667])
+
+ >>> rdd = sc.parallelize([v])
+ >>> nor.transform(rdd).collect()
+ [DenseVector([0.0, 0.3333, 0.6667])]
+
+ >>> nor2 = Normalizer(float("inf"))
+ >>> nor2.transform(v)
+ DenseVector([0.0, 0.5, 1.0])
+ """
+ def __init__(self, p=2):
+ """
+ :param p: Normalization in L^p^ space, p = 2 by default.
+ """
+ assert p >= 1.0, "p should be greater than 1.0"
+ self.p = float(p)
+
+ def transform(self, vector):
+ """
+ Applies unit length normalization on a vector.
+
+ :param vector: vector to be normalized.
+ :return: normalized vector. If the norm of the input is zero, it
+ will return the input vector.
+ """
+ sc = SparkContext._active_spark_context
+ assert sc is not None, "SparkContext should be initialized first"
+ return _callAPI(sc, "normalizeVector", self.p, vector)
+
+
+class JavaModelWrapper(VectorTransformer):
+ """
+ Wrapper for the model in JVM
+ """
+ def __init__(self, sc, java_model):
self._sc = sc
self._java_model = java_model
def __del__(self):
self._sc._gateway.detach(self._java_model)
- def transform(self, word):
+ def transform(self, dataset):
+ return _callJavaFunc(self._sc, self._java_model.transform, dataset)
+
+
+class StandardScalerModel(JavaModelWrapper):
+ """
+ :: Experimental ::
+ Represents a StandardScaler model that can transform vectors.
+ """
+ def transform(self, vector):
"""
- :param word: a word
- :return: vector representation of word
+ Applies standardization transformation on a vector.
+
+ :param vector: Vector to be standardized.
+ :return: Standardized vector. If the variance of a column is zero,
+ it will return default `0.0` for the column with zero
variance.
+ """
+ return JavaModelWrapper.transform(self, vector)
+
+class StandardScaler(object):
+ """
+ :: Experimental ::
+ Standardizes features by removing the mean and scaling to unit
+ variance using column summary statistics on the samples in the
+ training set.
+
+ >>> vs = [Vectors.dense([-2.0, 2.3, 0]), Vectors.dense([3.8, 0.0,
1.9])]
+ >>> dataset = sc.parallelize(vs)
+ >>> standardizer = StandardScaler(True, True)
+ >>> model = standardizer.fit(dataset)
+ >>> result = model.transform(dataset)
+ >>> for r in result.collect(): r
+ DenseVector([-0.7071, 0.7071, -0.7071])
+ DenseVector([0.7071, -0.7071, 0.7071])
+ """
+ def __init__(self, withMean=False, withStd=True):
+ """
+ :param withMean: False by default. Centers the data with mean
+ before scaling. It will build a dense output, so this
+ does not work on sparse input and will raise an exception.
+ :param withStd: True by default. Scales the data to unit standard
+ deviation.
+ """
+ if not (withMean or withStd):
+ warnings.warn("Both withMean and withStd are false. The model
does nothing.")
+ self.withMean = withMean
+ self.withStd = withStd
+
+ def fit(self, dataset):
+ """
+ Computes the mean and variance and stores as a model to be used
for later scaling.
+
+ :param data: The data used to compute the mean and variance to
build
+ the transformation model.
+ :return: a StandardScalarModel
+ """
+ sc = dataset.context
+ jmodel = _callAPI(sc, "fitStandardScaler", self.withMean,
self.withStd, dataset)
+ return StandardScalerModel(sc, jmodel)
+
+
+class HashingTF(object):
+ """
+ :: Experimental ::
+ Maps a sequence of terms to their term frequencies using the hashing
trick.
+
+ >>> htf = HashingTF(100)
+ >>> doc = "a a b b c d".split(" ")
+ >>> htf.transform(doc)
+ SparseVector(100, {1: 1.0, 14: 1.0, 31: 2.0, 44: 2.0})
+ """
+ def __init__(self, numFeatures=1 << 20):
+ """
+ :param numFeatures: number of features (default: 2^20)
+ """
+ self.numFeatures = numFeatures
+
+ def indexOf(self, term):
+ """ Returns the index of the input term. """
+ return hash(term) % self.numFeatures
+
+ def transform(self, document):
+ """
+ Transforms the input document (list of terms) to term frequency
vectors,
+ or transform the RDD of document to RDD of term frequency vectors.
+ """
+ if isinstance(document, RDD):
+ return document.map(self.transform)
+
+ freq = {}
+ for term in document:
+ i = self.indexOf(term)
+ freq[i] = freq.get(i, 0) + 1.0
+ return Vectors.sparse(self.numFeatures, freq.items())
+
+
+class IDFModel(JavaModelWrapper):
+ """
+ Represents an IDF model that can transform term frequency vectors.
+ """
+ def transform(self, dataset):
+ """
+ Transforms term frequency (TF) vectors to TF-IDF vectors.
+
+ If `minDocFreq` was set for the IDF calculation,
+ the terms which occur in fewer than `minDocFreq`
+ documents will have an entry of 0.
+
+ :param dataset: an RDD of term frequency vectors
+ :return: an RDD of TF-IDF vectors
+ """
+ return JavaModelWrapper.transform(self, dataset)
+
+
+class IDF(object):
+ """
+ :: Experimental ::
+ Inverse document frequency (IDF).
+
+ The standard formulation is used: `idf = log((m + 1) / (d(t) + 1))`,
+ where `m` is the total number of documents and `d(t)` is the number
+ of documents that contain term `t`.
+
+ This implementation supports filtering out terms which do not appear
+ in a minimum number of documents (controlled by the variable
`minDocFreq`).
+ For terms that are not in at least `minDocFreq` documents, the IDF is
+ found as 0, resulting in TF-IDFs of 0.
+
+ >>> n = 4
+ >>> freqs = [Vectors.sparse(n, (1, 3), (1.0, 2.0)),
+ ... Vectors.dense([0.0, 1.0, 2.0, 3.0]),
+ ... Vectors.sparse(n, [1], [1.0])]
+ >>> data = sc.parallelize(freqs)
+ >>> idf = IDF()
+ >>> model = idf.fit(data)
+ >>> tfidf = model.transform(data)
+ >>> for r in tfidf.collect(): r
+ SparseVector(4, {1: 0.0, 3: 0.5754})
+ DenseVector([0.0, 0.0, 1.3863, 0.863])
+ SparseVector(4, {1: 0.0})
+ """
+ def __init__(self, minDocFreq=0):
+ """
+ :param minDocFreq: minimum of documents in which a term
+ should appear for filtering
+ """
+ self.minDocFreq = minDocFreq
+
+ def fit(self, dataset):
+ """
+ Computes the inverse document frequency.
+
+ :param dataset: an RDD of term frequency vectors
+ """
+ sc = dataset.context
+ jmodel = _callAPI(sc, "fitIDF", self.minDocFreq, dataset)
+ return IDFModel(sc, jmodel)
+
+
+class Word2VecModel(JavaModelWrapper):
+ """
+ class for Word2Vec model
+ """
+ def transform(self, word):
+ """
Transforms a word to its vector representation
- Note: local use only
--- End diff --
By local use, we mean it doesn't work in a closure:
~~~
rdd.map(model.transform)
~~~
I think we should keep the note.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]