This is an automated email from the ASF dual-hosted git repository. meng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new fe75ff8 [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation fe75ff8 is described below commit fe75ff8bea3330a10aba1a61f3aba42e541195a8 Author: HyukjinKwon <gurwls...@apache.org> AuthorDate: Fri Jul 5 10:08:22 2019 -0700 [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation ## What changes were proposed in this pull request? Seems like we used to generate PySpark API documentation by Epydoc almost at the very first place (see https://github.com/apache/spark/commit/85b8f2c64f0fc4be5645d8736629fc082cb3587b). This fixes an actual issue: Before: ![Screen Shot 2019-07-05 at 8 20 01 PM](https://user-images.githubusercontent.com/6477701/60720491-e9879180-9f65-11e9-9562-100830a456cd.png) After: ![Screen Shot 2019-07-05 at 8 20 05 PM](https://user-images.githubusercontent.com/6477701/60720495-ec828200-9f65-11e9-8277-8f689e292cb0.png) It seems apparently a bug within `epytext` plugin during the conversion between`param` and `:param` syntax. See also [Epydoc syntax](http://epydoc.sourceforge.net/manual-epytext.html). Actually, Epydoc syntax violates [PEP-257](https://www.python.org/dev/peps/pep-0257/) IIRC and blocks us to enable some rules for doctest linter as well. We should remove this legacy away and I guess Spark 3 is good timing to do it. ## How was this patch tested? Manually built the doc and check each. I had to manually find the Epydoc syntax by `git grep -r "{L"`, for instance. Closes #25060 from HyukjinKwon/SPARK-28206. Authored-by: HyukjinKwon <gurwls...@apache.org> Signed-off-by: Xiangrui Meng <m...@databricks.com> --- python/docs/conf.py | 1 - python/docs/epytext.py | 30 -------- python/pyspark/accumulators.py | 14 ++-- python/pyspark/broadcast.py | 6 +- python/pyspark/conf.py | 8 +-- python/pyspark/context.py | 56 +++++++-------- python/pyspark/files.py | 7 +- python/pyspark/ml/feature.py | 2 +- python/pyspark/ml/linalg/__init__.py | 8 +-- python/pyspark/mllib/classification.py | 4 +- python/pyspark/mllib/clustering.py | 6 +- python/pyspark/mllib/linalg/__init__.py | 8 +-- python/pyspark/mllib/random.py | 6 +- python/pyspark/mllib/stat/_statistics.py | 4 +- python/pyspark/mllib/util.py | 4 +- python/pyspark/rdd.py | 114 +++++++++++++++---------------- python/pyspark/serializers.py | 12 ++-- python/pyspark/sql/dataframe.py | 10 +-- python/pyspark/sql/types.py | 2 +- python/pyspark/streaming/context.py | 42 ++++++------ python/pyspark/streaming/dstream.py | 50 +++++++------- python/pyspark/taskcontext.py | 2 +- python/pyspark/testing/streamingutils.py | 6 +- 23 files changed, 185 insertions(+), 217 deletions(-) diff --git a/python/docs/conf.py b/python/docs/conf.py index f507ee3..9e7afb7 100644 --- a/python/docs/conf.py +++ b/python/docs/conf.py @@ -31,7 +31,6 @@ needs_sphinx = '1.2' extensions = [ 'sphinx.ext.autodoc', 'sphinx.ext.viewcode', - 'epytext', 'sphinx.ext.mathjax', ] diff --git a/python/docs/epytext.py b/python/docs/epytext.py deleted file mode 100644 index 4bbbf65..0000000 --- a/python/docs/epytext.py +++ /dev/null @@ -1,30 +0,0 @@ -import re - -RULES = ( - (r"<(!BLANKLINE)[\w.]+>", r""), - (r"L{([\w.()]+)}", r":class:`\1`"), - (r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"), - (r"C{([\w.()]+)}", r":class:`\1`"), - (r"[IBCM]{([^}]+)}", r"`\1`"), - ('pyspark.rdd.RDD', 'RDD'), -) - - -def _convert_epytext(line): - """ - >>> _convert_epytext("L{A}") - :class:`A` - """ - line = line.replace('@', ':') - for p, sub in RULES: - line = re.sub(p, sub, line) - return line - - -def _process_docstring(app, what, name, obj, options, lines): - for i in range(len(lines)): - lines[i] = _convert_epytext(lines[i]) - - -def setup(app): - app.connect("autodoc-process-docstring", _process_docstring) diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py index 00ec094..a5d5132 100644 --- a/python/pyspark/accumulators.py +++ b/python/pyspark/accumulators.py @@ -123,13 +123,13 @@ class Accumulator(object): """ A shared variable that can be accumulated, i.e., has a commutative and associative "add" - operation. Worker tasks on a Spark cluster can add values to an Accumulator with the C{+=} - operator, but only the driver program is allowed to access its value, using C{value}. + operation. Worker tasks on a Spark cluster can add values to an Accumulator with the `+=` + operator, but only the driver program is allowed to access its value, using `value`. Updates from the workers get propagated automatically to the driver program. - While C{SparkContext} supports accumulators for primitive data types like C{int} and - C{float}, users can also define accumulators for custom types by providing a custom - L{AccumulatorParam} object. Refer to the doctest of this module for an example. + While :class:`SparkContext` supports accumulators for primitive data types like :class:`int` and + :class:`float`, users can also define accumulators for custom types by providing a custom + :class:`AccumulatorParam` object. Refer to the doctest of this module for an example. """ def __init__(self, aid, value, accum_param): @@ -185,14 +185,14 @@ class AccumulatorParam(object): def zero(self, value): """ Provide a "zero value" for the type, compatible in dimensions with the - provided C{value} (e.g., a zero vector) + provided `value` (e.g., a zero vector) """ raise NotImplementedError def addInPlace(self, value1, value2): """ Add two values of the accumulator's data type, returning a new value; - for efficiency, can also update C{value1} in place and return it. + for efficiency, can also update `value1` in place and return it. """ raise NotImplementedError diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index cca64b5..a97d409 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -49,8 +49,8 @@ def _from_id(bid): class Broadcast(object): """ - A broadcast variable created with L{SparkContext.broadcast()}. - Access its value through C{.value}. + A broadcast variable created with :meth:`SparkContext.broadcast`. + Access its value through :attr:`value`. Examples: @@ -69,7 +69,7 @@ class Broadcast(object): def __init__(self, sc=None, value=None, pickle_registry=None, path=None, sock_file=None): """ - Should not be called directly by users -- use L{SparkContext.broadcast()} + Should not be called directly by users -- use :meth:`SparkContext.broadcast` instead. """ if sc is not None: diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py index ab429d9..2024260 100644 --- a/python/pyspark/conf.py +++ b/python/pyspark/conf.py @@ -79,16 +79,16 @@ class SparkConf(object): parameters as key-value pairs. Most of the time, you would create a SparkConf object with - C{SparkConf()}, which will load values from C{spark.*} Java system + ``SparkConf()``, which will load values from `spark.*` Java system properties as well. In this case, any parameters you set directly on - the C{SparkConf} object take priority over system properties. + the :class:`SparkConf` object take priority over system properties. - For unit tests, you can also call C{SparkConf(false)} to skip + For unit tests, you can also call ``SparkConf(false)`` to skip loading external settings and get the same configuration no matter what the system properties are. All setter methods in this class support chaining. For example, - you can write C{conf.setMaster("local").setAppName("My app")}. + you can write ``conf.setMaster("local").setAppName("My app")``. .. note:: Once a SparkConf object is passed to Spark, it is cloned and can no longer be modified by the user. diff --git a/python/pyspark/context.py b/python/pyspark/context.py index a835298..69020e6 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -61,7 +61,7 @@ class SparkContext(object): """ Main entry point for Spark functionality. A SparkContext represents the - connection to a Spark cluster, and can be used to create L{RDD} and + connection to a Spark cluster, and can be used to create :class:`RDD` and broadcast variables on that cluster. .. note:: Only one :class:`SparkContext` should be active per JVM. You must `stop()` @@ -86,7 +86,7 @@ class SparkContext(object): gateway=None, jsc=None, profiler_cls=BasicProfiler): """ Create a new SparkContext. At least the master and app name should be set, - either through the named parameters here or through C{conf}. + either through the named parameters here or through `conf`. :param master: Cluster URL to connect to (e.g. mesos://host:port, spark://host:port, local[4]). @@ -102,7 +102,7 @@ class SparkContext(object): the batch size based on object sizes, or -1 to use an unlimited batch size :param serializer: The serializer for RDDs. - :param conf: A L{SparkConf} object setting Spark properties. + :param conf: A :class:`SparkConf` object setting Spark properties. :param gateway: Use an existing gateway and JVM, otherwise a new JVM will be instantiated. :param jsc: The JavaSparkContext instance (optional). @@ -576,7 +576,7 @@ class SparkContext(object): def pickleFile(self, name, minPartitions=None): """ - Load an RDD previously saved using L{RDD.saveAsPickleFile} method. + Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method. >>> tmpFile = NamedTemporaryFile(delete=True) >>> tmpFile.close() @@ -624,20 +624,24 @@ class SparkContext(object): as `utf-8`), which is faster and smaller than unicode. (Added in Spark 1.2) - For example, if you have the following files:: + For example, if you have the following files: - hdfs://a-hdfs-path/part-00000 - hdfs://a-hdfs-path/part-00001 - ... - hdfs://a-hdfs-path/part-nnnnn + .. code-block:: text - Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")}, - then C{rdd} contains:: + hdfs://a-hdfs-path/part-00000 + hdfs://a-hdfs-path/part-00001 + ... + hdfs://a-hdfs-path/part-nnnnn + + Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``, + then ``rdd`` contains: - (a-hdfs-path/part-00000, its content) - (a-hdfs-path/part-00001, its content) - ... - (a-hdfs-path/part-nnnnn, its content) + .. code-block:: text + + (a-hdfs-path/part-00000, its content) + (a-hdfs-path/part-00001, its content) + ... + (a-hdfs-path/part-nnnnn, its content) .. note:: Small files are preferred, as each file will be loaded fully in memory. @@ -705,7 +709,7 @@ class SparkContext(object): and value Writable classes 2. Serialization is attempted via Pyrolite pickling 3. If this fails, the fallback is to call 'toString' on each key and value - 4. C{PickleSerializer} is used to deserialize pickled objects on the Python side + 4. :class:`PickleSerializer` is used to deserialize pickled objects on the Python side :param path: path to sequncefile :param keyClass: fully qualified classname of key Writable class @@ -872,8 +876,7 @@ class SparkContext(object): def broadcast(self, value): """ - Broadcast a read-only variable to the cluster, returning a - L{Broadcast<pyspark.broadcast.Broadcast>} + Broadcast a read-only variable to the cluster, returning a :class:`Broadcast` object for reading it in distributed functions. The variable will be sent to each cluster only once. """ @@ -881,8 +884,8 @@ class SparkContext(object): def accumulator(self, value, accum_param=None): """ - Create an L{Accumulator} with the given initial value, using a given - L{AccumulatorParam} helper object to define how to add values of the + Create an :class:`Accumulator` with the given initial value, using a given + :class:`AccumulatorParam` helper object to define how to add values of the data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used. @@ -902,12 +905,11 @@ class SparkContext(object): def addFile(self, path, recursive=False): """ Add a file to be downloaded with this Spark job on every node. - The C{path} passed can be either a local file, a file in HDFS + The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. - To access the file in Spark jobs, use - L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} with the + To access the file in Spark jobs, use :meth:`SparkFiles.get` with the filename to find its download location. A directory can be given if the recursive option is set to True. @@ -932,7 +934,7 @@ class SparkContext(object): def addPyFile(self, path): """ Add a .py or .zip dependency for all tasks to be executed on this - SparkContext in the future. The C{path} passed can be either a local + SparkContext in the future. The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. @@ -978,7 +980,7 @@ class SparkContext(object): Application programmers can use this method to group all those jobs together and give a group description. Once set, the Spark web UI will associate such jobs with this group. - The application can use L{SparkContext.cancelJobGroup} to cancel all + The application can use :meth:`SparkContext.cancelJobGroup` to cancel all running jobs in this group. >>> import threading @@ -1023,7 +1025,7 @@ class SparkContext(object): def getLocalProperty(self, key): """ Get a local property set in this thread, or null if it is missing. See - L{setLocalProperty} + :meth:`setLocalProperty`. """ return self._jsc.getLocalProperty(key) @@ -1041,7 +1043,7 @@ class SparkContext(object): def cancelJobGroup(self, groupId): """ - Cancel active jobs for the specified group. See L{SparkContext.setJobGroup} + Cancel active jobs for the specified group. See :meth:`SparkContext.setJobGroup`. for more information. """ self._jsc.sc().cancelJobGroup(groupId) diff --git a/python/pyspark/files.py b/python/pyspark/files.py index 797573f..c08db41 100644 --- a/python/pyspark/files.py +++ b/python/pyspark/files.py @@ -24,8 +24,7 @@ __all__ = ['SparkFiles'] class SparkFiles(object): """ - Resolves paths to files added through - L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}. + Resolves paths to files added through :meth:`SparkContext.addFile`. SparkFiles contains only classmethods; users should not create SparkFiles instances. @@ -41,7 +40,7 @@ class SparkFiles(object): @classmethod def get(cls, filename): """ - Get the absolute path of a file added through C{SparkContext.addFile()}. + Get the absolute path of a file added through :meth:`SparkContext.addFile`. """ path = os.path.join(SparkFiles.getRootDirectory(), filename) return os.path.abspath(path) @@ -50,7 +49,7 @@ class SparkFiles(object): def getRootDirectory(cls): """ Get the root directory that contains files added through - C{SparkContext.addFile()}. + :meth:`SparkContext.addFile`. """ if cls._is_running_on_worker: return cls._root_directory diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py index 9827a2a..78d0269 100755 --- a/python/pyspark/ml/feature.py +++ b/python/pyspark/ml/feature.py @@ -2560,7 +2560,7 @@ class IndexToString(JavaTransformer, HasInputCol, HasOutputCol, JavaMLReadable, corresponding string values. The index-string mapping is either from the ML attributes of the input column, or from user-supplied labels (which take precedence over ML attributes). - See L{StringIndexer} for converting strings into indices. + See :class:`StringIndexer` for converting strings into indices. .. versionadded:: 1.6.0 """ diff --git a/python/pyspark/ml/linalg/__init__.py b/python/pyspark/ml/linalg/__init__.py index f6ddc09..a79d5e5 100644 --- a/python/pyspark/ml/linalg/__init__.py +++ b/python/pyspark/ml/linalg/__init__.py @@ -17,9 +17,9 @@ """ MLlib utilities for linear algebra. For dense vectors, MLlib -uses the NumPy C{array} type, so you can simply pass NumPy arrays -around. For sparse vectors, users can construct a L{SparseVector} -object from MLlib or pass SciPy C{scipy.sparse} column vectors if +uses the NumPy `array` type, so you can simply pass NumPy arrays +around. For sparse vectors, users can construct a :class:`SparseVector` +object from MLlib or pass SciPy `scipy.sparse` column vectors if SciPy is available in their environment. """ @@ -758,7 +758,7 @@ class Vectors(object): .. note:: Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users - can pass in SciPy's C{scipy.sparse} column vectors. + can pass in SciPy's `scipy.sparse` column vectors. """ @staticmethod diff --git a/python/pyspark/mllib/classification.py b/python/pyspark/mllib/classification.py index d2037be..c52da2a 100644 --- a/python/pyspark/mllib/classification.py +++ b/python/pyspark/mllib/classification.py @@ -659,11 +659,11 @@ class NaiveBayes(object): Train a Naive Bayes model given an RDD of (label, features) vectors. - This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which + This is the `Multinomial NB <http://tinyurl.com/lsdw6p>`_ which can handle all kinds of discrete data. For example, by converting documents into TF-IDF vectors, it can be used for document classification. By making every vector a 0-1 vector, - it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}). + it can also be used as `Bernoulli NB <http://tinyurl.com/p7c96j6>`_. The input feature values must be nonnegative. :param data: diff --git a/python/pyspark/mllib/clustering.py b/python/pyspark/mllib/clustering.py index 58da434..3524fcf 100644 --- a/python/pyspark/mllib/clustering.py +++ b/python/pyspark/mllib/clustering.py @@ -130,9 +130,9 @@ class BisectingKMeans(object): clusters, larger clusters get higher priority. Based on - U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf} - Steinbach, Karypis, and Kumar, A comparison of document clustering - techniques, KDD Workshop on Text Mining, 2000. + `Steinbach, Karypis, and Kumar, A comparison of document clustering + techniques, KDD Workshop on Text Mining, 2000 + <http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf>`_. .. versionadded:: 2.0.0 """ diff --git a/python/pyspark/mllib/linalg/__init__.py b/python/pyspark/mllib/linalg/__init__.py index df411d7..cd09621 100644 --- a/python/pyspark/mllib/linalg/__init__.py +++ b/python/pyspark/mllib/linalg/__init__.py @@ -17,9 +17,9 @@ """ MLlib utilities for linear algebra. For dense vectors, MLlib -uses the NumPy C{array} type, so you can simply pass NumPy arrays -around. For sparse vectors, users can construct a L{SparseVector} -object from MLlib or pass SciPy C{scipy.sparse} column vectors if +uses the NumPy `array` type, so you can simply pass NumPy arrays +around. For sparse vectors, users can construct a :class:`SparseVector` +object from MLlib or pass SciPy `scipy.sparse` column vectors if SciPy is available in their environment. """ @@ -847,7 +847,7 @@ class Vectors(object): .. note:: Dense vectors are simply represented as NumPy array objects, so there is no need to covert them for use in MLlib. For sparse vectors, the factory methods in this class create an MLlib-compatible type, or users - can pass in SciPy's C{scipy.sparse} column vectors. + can pass in SciPy's `scipy.sparse` column vectors. """ @staticmethod diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py index a8833cb..6106c58 100644 --- a/python/pyspark/mllib/random.py +++ b/python/pyspark/mllib/random.py @@ -54,8 +54,7 @@ class RandomRDDs(object): To transform the distribution in the generated RDD from U(0.0, 1.0) to U(a, b), use - C{RandomRDDs.uniformRDD(sc, n, p, seed)\ - .map(lambda v: a + (b - a) * v)} + ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * v)`` :param sc: SparkContext used to create the RDD. :param size: Size of the RDD. @@ -85,8 +84,7 @@ class RandomRDDs(object): To transform the distribution in the generated RDD from standard normal to some other normal N(mean, sigma^2), use - C{RandomRDDs.normal(sc, n, p, seed)\ - .map(lambda v: mean + sigma * v)} + ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)`` :param sc: SparkContext used to create the RDD. :param size: Size of the RDD. diff --git a/python/pyspark/mllib/stat/_statistics.py b/python/pyspark/mllib/stat/_statistics.py index 6e89bfd..d49f741 100644 --- a/python/pyspark/mllib/stat/_statistics.py +++ b/python/pyspark/mllib/stat/_statistics.py @@ -98,10 +98,10 @@ class Statistics(object): """ Compute the correlation (matrix) for the input RDD(s) using the specified method. - Methods currently supported: I{pearson (default), spearman}. + Methods currently supported: `pearson (default), spearman`. If a single RDD of Vectors is passed in, a correlation matrix - comparing the columns in the input RDD is returned. Use C{method=} + comparing the columns in the input RDD is returned. Use `method` to specify the method to be used for single RDD inout. If two RDDs of floats are passed in, a single float is returned. diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py index 0190bf3..1a0ce42 100644 --- a/python/pyspark/mllib/util.py +++ b/python/pyspark/mllib/util.py @@ -95,7 +95,7 @@ class MLUtils(object): which leads to inconsistent feature dimensions. :param minPartitions: min number of partitions - @return: labeled data stored as an RDD of LabeledPoint + :return: labeled data stored as an RDD of LabeledPoint >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import MLUtils @@ -156,7 +156,7 @@ class MLUtils(object): :param path: file or directory path in any Hadoop-supported file system URI :param minPartitions: min number of partitions - @return: labeled data stored as an RDD of LabeledPoint + :return: labeled data stored as an RDD of LabeledPoint >>> from tempfile import NamedTemporaryFile >>> from pyspark.mllib.util import MLUtils diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 8edb7f3..8bcc67a 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -286,13 +286,13 @@ class RDD(object): @property def context(self): """ - The L{SparkContext} that this RDD was created on. + The :class:`SparkContext` that this RDD was created on. """ return self.ctx def cache(self): """ - Persist this RDD with the default storage level (C{MEMORY_ONLY}). + Persist this RDD with the default storage level (`MEMORY_ONLY`). """ self.is_cached = True self.persist(StorageLevel.MEMORY_ONLY) @@ -303,7 +303,7 @@ class RDD(object): Set this RDD's storage level to persist its values across operations after the first time it is computed. This can only be used to assign a new storage level if the RDD does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_ONLY}). + If no storage level is specified defaults to (`MEMORY_ONLY`). >>> rdd = sc.parallelize(["b", "a", "c"]) >>> rdd.persist().is_cached @@ -330,7 +330,7 @@ class RDD(object): def checkpoint(self): """ Mark this RDD for checkpointing. It will be saved to a file inside the - checkpoint directory set with L{SparkContext.setCheckpointDir()} and + checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and all references to its parent RDDs will be removed. This function must be called before any job has been executed on this RDD. It is strongly recommended that this RDD is persisted in memory, otherwise saving it @@ -360,9 +360,9 @@ class RDD(object): This is NOT safe to use with dynamic allocation, which removes executors along with their cached blocks. If you must use both features, you are advised to set - L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value. + `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value. - The checkpoint directory set through L{SparkContext.setCheckpointDir()} is not used. + The checkpoint directory set through :meth:`SparkContext.setCheckpointDir` is not used. """ self._jrdd.rdd().localCheckpoint() @@ -786,8 +786,8 @@ class RDD(object): def cartesian(self, other): """ Return the Cartesian product of this RDD and another one, that is, the - RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and - C{b} is in C{other}. + RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and + ``b`` is in `other`. >>> rdd = sc.parallelize([1, 2]) >>> sorted(rdd.cartesian(rdd).collect()) @@ -960,9 +960,9 @@ class RDD(object): Aggregate the elements of each partition, and then the results for all the partitions, using a given associative function and a neutral "zero value." - The function C{op(t1, t2)} is allowed to modify C{t1} and return it + The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it as its result value to avoid object allocation; however, it should not - modify C{t2}. + modify ``t2``. This behaves somewhat differently from fold operations implemented for non-distributed collections in functional languages like Scala. @@ -995,9 +995,9 @@ class RDD(object): the partitions, using a given combine functions and a neutral "zero value." - The functions C{op(t1, t2)} is allowed to modify C{t1} and return it + The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it as its result value to avoid object allocation; however, it should not - modify C{t2}. + modify ``t2``. The first function (seqOp) can return a different result type, U, than the type of this RDD. Thus, we need one operation for merging a T into @@ -1128,7 +1128,7 @@ class RDD(object): def stats(self): """ - Return a L{StatCounter} object that captures the mean, variance + Return a :class:`StatCounter` object that captures the mean, variance and count of the RDD's elements in one operation. """ def redFunc(left_counter, right_counter): @@ -1467,10 +1467,10 @@ class RDD(object): def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Keys/values are converted for output using either user specified converters or, by default, - L{org.apache.spark.api.python.JavaToWritableConverter}. + "org.apache.spark.api.python.JavaToWritableConverter". :param conf: Hadoop job configuration, passed in as a dict :param keyConverter: (None by default) @@ -1484,11 +1484,11 @@ class RDD(object): def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, valueClass=None, keyConverter=None, valueConverter=None, conf=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the new Hadoop OutputFormat API (mapreduce package). Key and value types will be inferred if not specified. Keys and values are converted for output using either - user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The - C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The + `conf` is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. :param path: path to Hadoop file @@ -1511,10 +1511,10 @@ class RDD(object): def saveAsHadoopDataset(self, conf, keyConverter=None, valueConverter=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Keys/values are converted for output using either user specified converters or, by default, - L{org.apache.spark.api.python.JavaToWritableConverter}. + "org.apache.spark.api.python.JavaToWritableConverter". :param conf: Hadoop job configuration, passed in as a dict :param keyConverter: (None by default) @@ -1529,11 +1529,11 @@ class RDD(object): keyConverter=None, valueConverter=None, conf=None, compressionCodecClass=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file system, using the old Hadoop OutputFormat API (mapred package). Key and value types will be inferred if not specified. Keys and values are converted for output using either - user specified converters or L{org.apache.spark.api.python.JavaToWritableConverter}. The - C{conf} is applied on top of the base Hadoop conf associated with the SparkContext + user specified converters or "org.apache.spark.api.python.JavaToWritableConverter". The + `conf` is applied on top of the base Hadoop conf associated with the SparkContext of this RDD to create a merged Hadoop MapReduce job configuration for saving the data. :param path: path to Hadoop file @@ -1558,8 +1558,8 @@ class RDD(object): def saveAsSequenceFile(self, path, compressionCodecClass=None): """ - Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any Hadoop file - system, using the L{org.apache.hadoop.io.Writable} types that we convert from the + Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to any Hadoop file + system, using the "org.apache.hadoop.io.Writable" types that we convert from the RDD's key and value types. The mechanism is as follows: 1. Pyrolite is used to convert pickled Python RDD into RDD of Java objects. @@ -1575,7 +1575,7 @@ class RDD(object): def saveAsPickleFile(self, path, batchSize=10): """ Save this RDD as a SequenceFile of serialized objects. The serializer - used is L{pyspark.serializers.PickleSerializer}, default batch size + used is :class:`pyspark.serializers.PickleSerializer`, default batch size is 10. >>> tmpFile = NamedTemporaryFile(delete=True) @@ -1595,8 +1595,8 @@ class RDD(object): """ Save this RDD as a text file, using string representations of elements. - @param path: path to text file - @param compressionCodecClass: (None by default) string i.e. + :param path: path to text file + :param compressionCodecClass: (None by default) string i.e. "org.apache.hadoop.io.compress.GzipCodec" >>> tempFile = NamedTemporaryFile(delete=True) @@ -1685,8 +1685,8 @@ class RDD(object): This will also perform the merging locally on each mapper before sending results to a reducer, similarly to a "combiner" in MapReduce. - Output will be partitioned with C{numPartitions} partitions, or - the default parallelism level if C{numPartitions} is not specified. + Output will be partitioned with `numPartitions` partitions, or + the default parallelism level if `numPartitions` is not specified. Default partitioner is hash-partition. >>> from operator import add @@ -1737,10 +1737,10 @@ class RDD(object): def join(self, other, numPartitions=None): """ Return an RDD containing all pairs of elements with matching keys in - C{self} and C{other}. + `self` and `other`. Each pair of elements will be returned as a (k, (v1, v2)) tuple, where - (k, v1) is in C{self} and (k, v2) is in C{other}. + (k, v1) is in `self` and (k, v2) is in `other`. Performs a hash join across the cluster. @@ -1753,11 +1753,11 @@ class RDD(object): def leftOuterJoin(self, other, numPartitions=None): """ - Perform a left outer join of C{self} and C{other}. + Perform a left outer join of `self` and `other`. - For each element (k, v) in C{self}, the resulting RDD will either - contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in C{other} have key k. + For each element (k, v) in `self`, the resulting RDD will either + contain all pairs (k, (v, w)) for w in `other`, or the pair + (k, (v, None)) if no elements in `other` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1770,11 +1770,11 @@ class RDD(object): def rightOuterJoin(self, other, numPartitions=None): """ - Perform a right outer join of C{self} and C{other}. + Perform a right outer join of `self` and `other`. - For each element (k, w) in C{other}, the resulting RDD will either + For each element (k, w) in `other`, the resulting RDD will either contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w)) - if no elements in C{self} have key k. + if no elements in `self` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1787,15 +1787,15 @@ class RDD(object): def fullOuterJoin(self, other, numPartitions=None): """ - Perform a right outer join of C{self} and C{other}. + Perform a right outer join of `self` and `other`. - For each element (k, v) in C{self}, the resulting RDD will either - contain all pairs (k, (v, w)) for w in C{other}, or the pair - (k, (v, None)) if no elements in C{other} have key k. + For each element (k, v) in `self`, the resulting RDD will either + contain all pairs (k, (v, w)) for w in `other`, or the pair + (k, (v, None)) if no elements in `other` have key k. - Similarly, for each element (k, w) in C{other}, the resulting RDD will - either contain all pairs (k, (v, w)) for v in C{self}, or the pair - (k, (None, w)) if no elements in C{self} have key k. + Similarly, for each element (k, w) in `other`, the resulting RDD will + either contain all pairs (k, (v, w)) for v in `self`, or the pair + (k, (None, w)) if no elements in `self` have key k. Hash-partitions the resulting RDD into the given number of partitions. @@ -1891,11 +1891,11 @@ class RDD(object): Users provide three functions: - - C{createCombiner}, which turns a V into a C (e.g., creates + - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) - - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of + - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) - - C{mergeCombiners}, to combine two C's into a single one (e.g., merges + - `mergeCombiners`, to combine two C's into a single one (e.g., merges the lists) To avoid memory allocation, both mergeValue and mergeCombiners are allowed to @@ -2072,9 +2072,9 @@ class RDD(object): # TODO: add variant with custom parittioner def cogroup(self, other, numPartitions=None): """ - For each key k in C{self} or C{other}, return a resulting RDD that - contains a tuple with the list of values for that key in C{self} as - well as C{other}. + For each key k in `self` or `other`, return a resulting RDD that + contains a tuple with the list of values for that key in `self` as + well as `other`. >>> x = sc.parallelize([("a", 1), ("b", 4)]) >>> y = sc.parallelize([("a", 2)]) @@ -2106,8 +2106,8 @@ class RDD(object): def subtractByKey(self, other, numPartitions=None): """ - Return each (key, value) pair in C{self} that has no pair with matching - key in C{other}. + Return each (key, value) pair in `self` that has no pair with matching + key in `other`. >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) @@ -2121,7 +2121,7 @@ class RDD(object): def subtract(self, other, numPartitions=None): """ - Return each value in C{self} that is not contained in C{other}. + Return each value in `self` that is not contained in `other`. >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)]) >>> y = sc.parallelize([("a", 3), ("c", None)]) @@ -2134,7 +2134,7 @@ class RDD(object): def keyBy(self, f): """ - Creates tuples of the elements in this RDD by applying C{f}. + Creates tuples of the elements in this RDD by applying `f`. >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x) >>> y = sc.parallelize(zip(range(0,5), range(0,5))) @@ -2260,7 +2260,7 @@ class RDD(object): Items in the kth partition will get ids k, n+k, 2*n+k, ..., where n is the number of partitions. So there may exist gaps, but this method won't trigger a spark job, which is different from - L{zipWithIndex} + :meth:`zipWithIndex`. >>> sc.parallelize(["a", "b", "c", "d", "e"], 3).zipWithUniqueId().collect() [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)] diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py index ddca2a7..00f6081 100644 --- a/python/pyspark/serializers.py +++ b/python/pyspark/serializers.py @@ -19,12 +19,12 @@ PySpark supports custom serializers for transferring data; this can improve performance. -By default, PySpark uses L{PickleSerializer} to serialize objects using Python's -C{cPickle} serializer, which can serialize nearly any Python object. -Other serializers, like L{MarshalSerializer}, support fewer datatypes but can be +By default, PySpark uses :class:`PickleSerializer` to serialize objects using Python's +`cPickle` serializer, which can serialize nearly any Python object. +Other serializers, like :class:`MarshalSerializer`, support fewer datatypes but can be faster. -The serializer is chosen when creating L{SparkContext}: +The serializer is chosen when creating :class:`SparkContext`: >>> from pyspark.context import SparkContext >>> from pyspark.serializers import MarshalSerializer @@ -34,7 +34,7 @@ The serializer is chosen when creating L{SparkContext}: >>> sc.stop() PySpark serializes objects in batches; by default, the batch size is chosen based -on the size of objects and is also configurable by SparkContext's C{batchSize} +on the size of objects and is also configurable by SparkContext's `batchSize` parameter: >>> sc = SparkContext('local', 'test', batchSize=2) @@ -129,7 +129,7 @@ class FramedSerializer(Serializer): """ Serializer that writes objects as a stream of (length, data) pairs, - where C{length} is a 32-bit integer and data is C{length} bytes. + where `length` is a 32-bit integer and data is `length` bytes. """ def __init__(self): diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index e666973..87d4b81 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -408,7 +408,7 @@ class DataFrame(object): """Returns a checkpointed version of this Dataset. Checkpointing can be used to truncate the logical plan of this DataFrame, which is especially useful in iterative algorithms where the plan may grow exponentially. It will be saved to files inside the checkpoint - directory set with L{SparkContext.setCheckpointDir()}. + directory set with :meth:`SparkContext.setCheckpointDir`. :param eager: Whether to checkpoint this DataFrame immediately @@ -581,9 +581,9 @@ class DataFrame(object): @since(1.3) def cache(self): - """Persists the :class:`DataFrame` with the default storage level (C{MEMORY_AND_DISK}). + """Persists the :class:`DataFrame` with the default storage level (`MEMORY_AND_DISK`). - .. note:: The default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0. + .. note:: The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. """ self.is_cached = True self._jdf.cache() @@ -594,9 +594,9 @@ class DataFrame(object): """Sets the storage level to persist the contents of the :class:`DataFrame` across operations after the first time it is computed. This can only be used to assign a new storage level if the :class:`DataFrame` does not have a storage level set yet. - If no storage level is specified defaults to (C{MEMORY_AND_DISK}). + If no storage level is specified defaults to (`MEMORY_AND_DISK`). - .. note:: The default storage level has changed to C{MEMORY_AND_DISK} to match Scala in 2.0. + .. note:: The default storage level has changed to `MEMORY_AND_DISK` to match Scala in 2.0. """ self.is_cached = True javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel) diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py index f9b12f1..da84fc1 100644 --- a/python/pyspark/sql/types.py +++ b/python/pyspark/sql/types.py @@ -1405,7 +1405,7 @@ def _create_row(fields, values): class Row(tuple): """ - A row in L{DataFrame}. + A row in :class:`DataFrame`. The fields in it can be accessed: * like attributes (``row.key``) diff --git a/python/pyspark/streaming/context.py b/python/pyspark/streaming/context.py index 6fbe26b6..769121c 100644 --- a/python/pyspark/streaming/context.py +++ b/python/pyspark/streaming/context.py @@ -33,7 +33,7 @@ class StreamingContext(object): """ Main entry point for Spark Streaming functionality. A StreamingContext represents the connection to a Spark cluster, and can be used to create - L{DStream} various input sources. It can be from an existing L{SparkContext}. + :class:`DStream` various input sources. It can be from an existing :class:`SparkContext`. After creating and transforming DStreams, the streaming computation can be started and stopped using `context.start()` and `context.stop()`, respectively. `context.awaitTermination()` allows the current thread @@ -48,8 +48,8 @@ class StreamingContext(object): """ Create a new StreamingContext. - @param sparkContext: L{SparkContext} object. - @param batchDuration: the time interval (in seconds) at which streaming + :param sparkContext: :class:`SparkContext` object. + :param batchDuration: the time interval (in seconds) at which streaming data will be divided into batches """ @@ -92,8 +92,8 @@ class StreamingContext(object): recreated from the checkpoint data. If the data does not exist, then the provided setupFunc will be used to create a new context. - @param checkpointPath: Checkpoint directory used in an earlier streaming program - @param setupFunc: Function to create a new context and setup DStreams + :param checkpointPath: Checkpoint directory used in an earlier streaming program + :param setupFunc: Function to create a new context and setup DStreams """ cls._ensure_initialized() gw = SparkContext._gateway @@ -149,10 +149,10 @@ class StreamingContext(object): valid checkpoint data, then setupFunc will be called to create a new context and setup DStreams. - @param checkpointPath: Checkpoint directory used in an earlier streaming program. Can be + :param checkpointPath: Checkpoint directory used in an earlier streaming program. Can be None if the intention is to always create a new context when there is no active context. - @param setupFunc: Function to create a new JavaStreamingContext and setup DStreams + :param setupFunc: Function to create a new JavaStreamingContext and setup DStreams """ if setupFunc is None: @@ -183,7 +183,7 @@ class StreamingContext(object): """ Wait for the execution to stop. - @param timeout: time to wait in seconds + :param timeout: time to wait in seconds """ if timeout is None: self._jssc.awaitTermination() @@ -196,7 +196,7 @@ class StreamingContext(object): throw the reported error during the execution; or `false` if the waiting time elapsed before returning from the method. - @param timeout: time to wait in seconds + :param timeout: time to wait in seconds """ return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000)) @@ -205,8 +205,8 @@ class StreamingContext(object): Stop the execution of the streams, with option of ensuring all received data has been processed. - @param stopSparkContext: Stop the associated SparkContext or not - @param stopGracefully: Stop gracefully by waiting for the processing + :param stopSparkContext: Stop the associated SparkContext or not + :param stopGracefully: Stop gracefully by waiting for the processing of all received data to be completed """ self._jssc.stop(stopSparkContext, stopGraceFully) @@ -223,7 +223,7 @@ class StreamingContext(object): the RDDs (if the developer wishes to query old data outside the DStream computation). - @param duration: Minimum duration (in seconds) that each DStream + :param duration: Minimum duration (in seconds) that each DStream should remember its RDDs """ self._jssc.remember(self._jduration(duration)) @@ -233,7 +233,7 @@ class StreamingContext(object): Sets the context to periodically checkpoint the DStream operations for master fault-tolerance. The graph will be checkpointed every batch interval. - @param directory: HDFS-compatible directory where the checkpoint data + :param directory: HDFS-compatible directory where the checkpoint data will be reliably stored """ self._jssc.checkpoint(directory) @@ -244,9 +244,9 @@ class StreamingContext(object): a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` delimited lines. - @param hostname: Hostname to connect to for receiving data - @param port: Port to connect to for receiving data - @param storageLevel: Storage level to use for storing the received objects + :param hostname: Hostname to connect to for receiving data + :param port: Port to connect to for receiving data + :param storageLevel: Storage level to use for storing the received objects """ jlevel = self._sc._getJavaStorageLevel(storageLevel) return DStream(self._jssc.socketTextStream(hostname, port, jlevel), self, @@ -270,8 +270,8 @@ class StreamingContext(object): them from another location within the same file system. File names starting with . are ignored. - @param directory: Directory to load data from - @param recordLength: Length of each record in bytes + :param directory: Directory to load data from + :param recordLength: Length of each record in bytes """ return DStream(self._jssc.binaryRecordsStream(directory, recordLength), self, NoOpSerializer()) @@ -290,9 +290,9 @@ class StreamingContext(object): .. note:: Changes to the queue after the stream is created will not be recognized. - @param rdds: Queue of RDDs - @param oneAtATime: pick one rdd each time or pick all of them once. - @param default: The default rdd if no more in rdds + :param rdds: Queue of RDDs + :param oneAtATime: pick one rdd each time or pick all of them once. + :param default: The default rdd if no more in rdds """ if default and not isinstance(default, RDD): default = self._sc.parallelize(default) diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index c253e5c..60562a6 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -41,11 +41,11 @@ class DStream(object): """ A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous sequence of RDDs (of the same type) representing a - continuous stream of data (see L{RDD} in the Spark core documentation + continuous stream of data (see :class:`RDD` in the Spark core documentation for more details on RDDs). DStreams can either be created from live data (such as, data from TCP - sockets, etc.) using a L{StreamingContext} or it can be + sockets, etc.) using a :class:`StreamingContext` or it can be generated by transforming existing DStreams using operations such as `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream periodically generates a RDD, either @@ -167,7 +167,7 @@ class DStream(object): """ Print the first num elements of each RDD generated in this DStream. - @param num: the number of elements from the first will be printed. + :param num: the number of elements from the first will be printed. """ def takeAndPrint(time, rdd): taken = rdd.take(num + 1) @@ -210,7 +210,7 @@ class DStream(object): def cache(self): """ Persist the RDDs of this DStream with the default storage level - (C{MEMORY_ONLY}). + (`MEMORY_ONLY`). """ self.is_cached = True self.persist(StorageLevel.MEMORY_ONLY) @@ -229,7 +229,7 @@ class DStream(object): """ Enable periodic checkpointing of RDDs of this DStream - @param interval: time in seconds, after each period of that, generated + :param interval: time in seconds, after each period of that, generated RDD will be checkpointed """ self.is_checkpointed = True @@ -333,7 +333,7 @@ class DStream(object): """ Return a new DStream by unifying data of another DStream with this DStream. - @param other: Another DStream having the same interval (i.e., slideDuration) + :param other: Another DStream having the same interval (i.e., slideDuration) as this DStream. """ if self._slideDuration != other._slideDuration: @@ -429,9 +429,9 @@ class DStream(object): Return a new DStream in which each RDD contains all the elements in seen in a sliding window of time over this DStream. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ @@ -455,13 +455,13 @@ class DStream(object): 2. "inverse reduce" the old values that left the window (e.g., subtracting old counts) This is more efficient than `invReduceFunc` is None. - @param reduceFunc: associative and commutative reduce function - @param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y, + :param reduceFunc: associative and commutative reduce function + :param invReduceFunc: inverse reduce function of `reduceFunc`; such that for all y, and invertible x: `invReduceFunc(reduceFunc(x, y), x) = y` - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval """ @@ -487,12 +487,12 @@ class DStream(object): Return a new DStream in which each RDD contains the count of distinct elements in RDDs in a sliding window over this DStream. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. + :param numPartitions: number of partitions of each RDD in the new DStream. """ keyed = self.map(lambda x: (x, 1)) counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub, @@ -504,12 +504,12 @@ class DStream(object): Return a new DStream by applying `groupByKey` over a sliding window. Similar to `DStream.groupByKey()`, but applies it over a sliding window. - @param windowDuration: width of the window; must be a multiple of this DStream's + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: Number of partitions of each RDD in the new DStream. + :param numPartitions: Number of partitions of each RDD in the new DStream. """ ls = self.mapValues(lambda x: [x]) grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, lambda a, b: a[len(b):], @@ -528,15 +528,15 @@ class DStream(object): `invFunc` can be None, then it will reduce all the RDDs in window, could be slower than having `invFunc`. - @param func: associative and commutative reduce function - @param invFunc: inverse function of `reduceFunc` - @param windowDuration: width of the window; must be a multiple of this DStream's + :param func: associative and commutative reduce function + :param invFunc: inverse function of `reduceFunc` + :param windowDuration: width of the window; must be a multiple of this DStream's batching interval - @param slideDuration: sliding interval of the window (i.e., the interval after which + :param slideDuration: sliding interval of the window (i.e., the interval after which the new DStream will generate RDDs); must be a multiple of this DStream's batching interval - @param numPartitions: number of partitions of each RDD in the new DStream. - @param filterFunc: function to filter expired key-value pairs; + :param numPartitions: number of partitions of each RDD in the new DStream. + :param filterFunc: function to filter expired key-value pairs; only pairs that satisfy the function are retained set this to null if you do not want to filter """ @@ -578,7 +578,7 @@ class DStream(object): Return a new "state" DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values of the key. - @param updateFunc: State update function. If this function returns None, then + :param updateFunc: State update function. If this function returns None, then corresponding state key-value pair will be eliminated. """ if numPartitions is None: diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index dff5e18..6d28491 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -28,7 +28,7 @@ class TaskContext(object): Contextual information about a task which can be read or mutated during execution. To access the TaskContext for a running task, use: - L{TaskContext.get()}. + :meth:`TaskContext.get`. """ _taskContext = None diff --git a/python/pyspark/testing/streamingutils.py b/python/pyspark/testing/streamingutils.py index 4c27f8a..a6abc2e 100644 --- a/python/pyspark/testing/streamingutils.py +++ b/python/pyspark/testing/streamingutils.py @@ -137,9 +137,9 @@ class PySparkStreamingTestCase(unittest.TestCase): def _test_func(self, input, func, expected, sort=False, input2=None): """ - @param input: dataset for the test. This should be list of lists. - @param func: wrapped function. This function should return PythonDStream object. - @param expected: expected output for this testcase. + :param input: dataset for the test. This should be list of lists. + :param func: wrapped function. This function should return PythonDStream object. + :param expected: expected output for this testcase. """ if not isinstance(input[0], RDD): input = [self.sc.parallelize(d, 1) for d in input] --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org