This is an automated email from the ASF dual-hosted git repository.

meng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new fe75ff8  [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API 
documentation
fe75ff8 is described below

commit fe75ff8bea3330a10aba1a61f3aba42e541195a8
Author: HyukjinKwon <gurwls...@apache.org>
AuthorDate: Fri Jul 5 10:08:22 2019 -0700

    [SPARK-28206][PYTHON] Remove the legacy Epydoc in PySpark API documentation
    
    ## What changes were proposed in this pull request?
    
    Seems like we used to generate PySpark API documentation by Epydoc almost 
at the very first place (see 
https://github.com/apache/spark/commit/85b8f2c64f0fc4be5645d8736629fc082cb3587b).
    
    This fixes an actual issue:
    
    Before:
    
    ![Screen Shot 2019-07-05 at 8 20 01 
PM](https://user-images.githubusercontent.com/6477701/60720491-e9879180-9f65-11e9-9562-100830a456cd.png)
    
    After:
    
    ![Screen Shot 2019-07-05 at 8 20 05 
PM](https://user-images.githubusercontent.com/6477701/60720495-ec828200-9f65-11e9-8277-8f689e292cb0.png)
    
    It seems apparently a bug within `epytext` plugin during the conversion 
between`param` and `:param` syntax. See also [Epydoc 
syntax](http://epydoc.sourceforge.net/manual-epytext.html).
    
    Actually, Epydoc syntax violates 
[PEP-257](https://www.python.org/dev/peps/pep-0257/) IIRC and blocks us to 
enable some rules for doctest linter as well.
    
    We should remove this legacy away and I guess Spark 3 is good timing to do 
it.
    
    ## How was this patch tested?
    
    Manually built the doc and check each.
    
    I had to manually find the Epydoc syntax by `git grep -r "{L"`, for 
instance.
    
    Closes #25060 from HyukjinKwon/SPARK-28206.
    
    Authored-by: HyukjinKwon <gurwls...@apache.org>
    Signed-off-by: Xiangrui Meng <m...@databricks.com>
---
 python/docs/conf.py                      |   1 -
 python/docs/epytext.py                   |  30 --------
 python/pyspark/accumulators.py           |  14 ++--
 python/pyspark/broadcast.py              |   6 +-
 python/pyspark/conf.py                   |   8 +--
 python/pyspark/context.py                |  56 +++++++--------
 python/pyspark/files.py                  |   7 +-
 python/pyspark/ml/feature.py             |   2 +-
 python/pyspark/ml/linalg/__init__.py     |   8 +--
 python/pyspark/mllib/classification.py   |   4 +-
 python/pyspark/mllib/clustering.py       |   6 +-
 python/pyspark/mllib/linalg/__init__.py  |   8 +--
 python/pyspark/mllib/random.py           |   6 +-
 python/pyspark/mllib/stat/_statistics.py |   4 +-
 python/pyspark/mllib/util.py             |   4 +-
 python/pyspark/rdd.py                    | 114 +++++++++++++++----------------
 python/pyspark/serializers.py            |  12 ++--
 python/pyspark/sql/dataframe.py          |  10 +--
 python/pyspark/sql/types.py              |   2 +-
 python/pyspark/streaming/context.py      |  42 ++++++------
 python/pyspark/streaming/dstream.py      |  50 +++++++-------
 python/pyspark/taskcontext.py            |   2 +-
 python/pyspark/testing/streamingutils.py |   6 +-
 23 files changed, 185 insertions(+), 217 deletions(-)

diff --git a/python/docs/conf.py b/python/docs/conf.py
index f507ee3..9e7afb7 100644
--- a/python/docs/conf.py
+++ b/python/docs/conf.py
@@ -31,7 +31,6 @@ needs_sphinx = '1.2'
 extensions = [
     'sphinx.ext.autodoc',
     'sphinx.ext.viewcode',
-    'epytext',
     'sphinx.ext.mathjax',
 ]
 
diff --git a/python/docs/epytext.py b/python/docs/epytext.py
deleted file mode 100644
index 4bbbf65..0000000
--- a/python/docs/epytext.py
+++ /dev/null
@@ -1,30 +0,0 @@
-import re
-
-RULES = (
-    (r"<(!BLANKLINE)[\w.]+>", r""),
-    (r"L{([\w.()]+)}", r":class:`\1`"),
-    (r"[LC]{(\w+\.\w+)\(\)}", r":func:`\1`"),
-    (r"C{([\w.()]+)}", r":class:`\1`"),
-    (r"[IBCM]{([^}]+)}", r"`\1`"),
-    ('pyspark.rdd.RDD', 'RDD'),
-)
-
-
-def _convert_epytext(line):
-    """
-    >>> _convert_epytext("L{A}")
-    :class:`A`
-    """
-    line = line.replace('@', ':')
-    for p, sub in RULES:
-        line = re.sub(p, sub, line)
-    return line
-
-
-def _process_docstring(app, what, name, obj, options, lines):
-    for i in range(len(lines)):
-        lines[i] = _convert_epytext(lines[i])
-
-
-def setup(app):
-    app.connect("autodoc-process-docstring", _process_docstring)
diff --git a/python/pyspark/accumulators.py b/python/pyspark/accumulators.py
index 00ec094..a5d5132 100644
--- a/python/pyspark/accumulators.py
+++ b/python/pyspark/accumulators.py
@@ -123,13 +123,13 @@ class Accumulator(object):
 
     """
     A shared variable that can be accumulated, i.e., has a commutative and 
associative "add"
-    operation. Worker tasks on a Spark cluster can add values to an 
Accumulator with the C{+=}
-    operator, but only the driver program is allowed to access its value, 
using C{value}.
+    operation. Worker tasks on a Spark cluster can add values to an 
Accumulator with the `+=`
+    operator, but only the driver program is allowed to access its value, 
using `value`.
     Updates from the workers get propagated automatically to the driver 
program.
 
-    While C{SparkContext} supports accumulators for primitive data types like 
C{int} and
-    C{float}, users can also define accumulators for custom types by providing 
a custom
-    L{AccumulatorParam} object. Refer to the doctest of this module for an 
example.
+    While :class:`SparkContext` supports accumulators for primitive data types 
like :class:`int` and
+    :class:`float`, users can also define accumulators for custom types by 
providing a custom
+    :class:`AccumulatorParam` object. Refer to the doctest of this module for 
an example.
     """
 
     def __init__(self, aid, value, accum_param):
@@ -185,14 +185,14 @@ class AccumulatorParam(object):
     def zero(self, value):
         """
         Provide a "zero value" for the type, compatible in dimensions with the
-        provided C{value} (e.g., a zero vector)
+        provided `value` (e.g., a zero vector)
         """
         raise NotImplementedError
 
     def addInPlace(self, value1, value2):
         """
         Add two values of the accumulator's data type, returning a new value;
-        for efficiency, can also update C{value1} in place and return it.
+        for efficiency, can also update `value1` in place and return it.
         """
         raise NotImplementedError
 
diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index cca64b5..a97d409 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -49,8 +49,8 @@ def _from_id(bid):
 class Broadcast(object):
 
     """
-    A broadcast variable created with L{SparkContext.broadcast()}.
-    Access its value through C{.value}.
+    A broadcast variable created with :meth:`SparkContext.broadcast`.
+    Access its value through :attr:`value`.
 
     Examples:
 
@@ -69,7 +69,7 @@ class Broadcast(object):
     def __init__(self, sc=None, value=None, pickle_registry=None, path=None,
                  sock_file=None):
         """
-        Should not be called directly by users -- use 
L{SparkContext.broadcast()}
+        Should not be called directly by users -- use 
:meth:`SparkContext.broadcast`
         instead.
         """
         if sc is not None:
diff --git a/python/pyspark/conf.py b/python/pyspark/conf.py
index ab429d9..2024260 100644
--- a/python/pyspark/conf.py
+++ b/python/pyspark/conf.py
@@ -79,16 +79,16 @@ class SparkConf(object):
     parameters as key-value pairs.
 
     Most of the time, you would create a SparkConf object with
-    C{SparkConf()}, which will load values from C{spark.*} Java system
+    ``SparkConf()``, which will load values from `spark.*` Java system
     properties as well. In this case, any parameters you set directly on
-    the C{SparkConf} object take priority over system properties.
+    the :class:`SparkConf` object take priority over system properties.
 
-    For unit tests, you can also call C{SparkConf(false)} to skip
+    For unit tests, you can also call ``SparkConf(false)`` to skip
     loading external settings and get the same configuration no matter
     what the system properties are.
 
     All setter methods in this class support chaining. For example,
-    you can write C{conf.setMaster("local").setAppName("My app")}.
+    you can write ``conf.setMaster("local").setAppName("My app")``.
 
     .. note:: Once a SparkConf object is passed to Spark, it is cloned
         and can no longer be modified by the user.
diff --git a/python/pyspark/context.py b/python/pyspark/context.py
index a835298..69020e6 100644
--- a/python/pyspark/context.py
+++ b/python/pyspark/context.py
@@ -61,7 +61,7 @@ class SparkContext(object):
 
     """
     Main entry point for Spark functionality. A SparkContext represents the
-    connection to a Spark cluster, and can be used to create L{RDD} and
+    connection to a Spark cluster, and can be used to create :class:`RDD` and
     broadcast variables on that cluster.
 
     .. note:: Only one :class:`SparkContext` should be active per JVM. You 
must `stop()`
@@ -86,7 +86,7 @@ class SparkContext(object):
                  gateway=None, jsc=None, profiler_cls=BasicProfiler):
         """
         Create a new SparkContext. At least the master and app name should be 
set,
-        either through the named parameters here or through C{conf}.
+        either through the named parameters here or through `conf`.
 
         :param master: Cluster URL to connect to
                (e.g. mesos://host:port, spark://host:port, local[4]).
@@ -102,7 +102,7 @@ class SparkContext(object):
                the batch size based on object sizes, or -1 to use an unlimited
                batch size
         :param serializer: The serializer for RDDs.
-        :param conf: A L{SparkConf} object setting Spark properties.
+        :param conf: A :class:`SparkConf` object setting Spark properties.
         :param gateway: Use an existing gateway and JVM, otherwise a new JVM
                will be instantiated.
         :param jsc: The JavaSparkContext instance (optional).
@@ -576,7 +576,7 @@ class SparkContext(object):
 
     def pickleFile(self, name, minPartitions=None):
         """
-        Load an RDD previously saved using L{RDD.saveAsPickleFile} method.
+        Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method.
 
         >>> tmpFile = NamedTemporaryFile(delete=True)
         >>> tmpFile.close()
@@ -624,20 +624,24 @@ class SparkContext(object):
         as `utf-8`), which is faster and smaller than unicode. (Added in
         Spark 1.2)
 
-        For example, if you have the following files::
+        For example, if you have the following files:
 
-          hdfs://a-hdfs-path/part-00000
-          hdfs://a-hdfs-path/part-00001
-          ...
-          hdfs://a-hdfs-path/part-nnnnn
+        .. code-block:: text
 
-        Do C{rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")},
-        then C{rdd} contains::
+            hdfs://a-hdfs-path/part-00000
+            hdfs://a-hdfs-path/part-00001
+            ...
+            hdfs://a-hdfs-path/part-nnnnn
+
+        Do ``rdd = sparkContext.wholeTextFiles("hdfs://a-hdfs-path")``,
+        then ``rdd`` contains:
 
-          (a-hdfs-path/part-00000, its content)
-          (a-hdfs-path/part-00001, its content)
-          ...
-          (a-hdfs-path/part-nnnnn, its content)
+        .. code-block:: text
+
+            (a-hdfs-path/part-00000, its content)
+            (a-hdfs-path/part-00001, its content)
+            ...
+            (a-hdfs-path/part-nnnnn, its content)
 
         .. note:: Small files are preferred, as each file will be loaded
             fully in memory.
@@ -705,7 +709,7 @@ class SparkContext(object):
                and value Writable classes
             2. Serialization is attempted via Pyrolite pickling
             3. If this fails, the fallback is to call 'toString' on each key 
and value
-            4. C{PickleSerializer} is used to deserialize pickled objects on 
the Python side
+            4. :class:`PickleSerializer` is used to deserialize pickled 
objects on the Python side
 
         :param path: path to sequncefile
         :param keyClass: fully qualified classname of key Writable class
@@ -872,8 +876,7 @@ class SparkContext(object):
 
     def broadcast(self, value):
         """
-        Broadcast a read-only variable to the cluster, returning a
-        L{Broadcast<pyspark.broadcast.Broadcast>}
+        Broadcast a read-only variable to the cluster, returning a 
:class:`Broadcast`
         object for reading it in distributed functions. The variable will
         be sent to each cluster only once.
         """
@@ -881,8 +884,8 @@ class SparkContext(object):
 
     def accumulator(self, value, accum_param=None):
         """
-        Create an L{Accumulator} with the given initial value, using a given
-        L{AccumulatorParam} helper object to define how to add values of the
+        Create an :class:`Accumulator` with the given initial value, using a 
given
+        :class:`AccumulatorParam` helper object to define how to add values of 
the
         data type if provided. Default AccumulatorParams are used for integers
         and floating-point numbers if you do not provide one. For other types,
         a custom AccumulatorParam can be used.
@@ -902,12 +905,11 @@ class SparkContext(object):
     def addFile(self, path, recursive=False):
         """
         Add a file to be downloaded with this Spark job on every node.
-        The C{path} passed can be either a local file, a file in HDFS
+        The `path` passed can be either a local file, a file in HDFS
         (or other Hadoop-supported filesystems), or an HTTP, HTTPS or
         FTP URI.
 
-        To access the file in Spark jobs, use
-        L{SparkFiles.get(fileName)<pyspark.files.SparkFiles.get>} with the
+        To access the file in Spark jobs, use :meth:`SparkFiles.get` with the
         filename to find its download location.
 
         A directory can be given if the recursive option is set to True.
@@ -932,7 +934,7 @@ class SparkContext(object):
     def addPyFile(self, path):
         """
         Add a .py or .zip dependency for all tasks to be executed on this
-        SparkContext in the future.  The C{path} passed can be either a local
+        SparkContext in the future.  The `path` passed can be either a local
         file, a file in HDFS (or other Hadoop-supported filesystems), or an
         HTTP, HTTPS or FTP URI.
 
@@ -978,7 +980,7 @@ class SparkContext(object):
         Application programmers can use this method to group all those jobs 
together and give a
         group description. Once set, the Spark web UI will associate such jobs 
with this group.
 
-        The application can use L{SparkContext.cancelJobGroup} to cancel all
+        The application can use :meth:`SparkContext.cancelJobGroup` to cancel 
all
         running jobs in this group.
 
         >>> import threading
@@ -1023,7 +1025,7 @@ class SparkContext(object):
     def getLocalProperty(self, key):
         """
         Get a local property set in this thread, or null if it is missing. See
-        L{setLocalProperty}
+        :meth:`setLocalProperty`.
         """
         return self._jsc.getLocalProperty(key)
 
@@ -1041,7 +1043,7 @@ class SparkContext(object):
 
     def cancelJobGroup(self, groupId):
         """
-        Cancel active jobs for the specified group. See 
L{SparkContext.setJobGroup}
+        Cancel active jobs for the specified group. See 
:meth:`SparkContext.setJobGroup`.
         for more information.
         """
         self._jsc.sc().cancelJobGroup(groupId)
diff --git a/python/pyspark/files.py b/python/pyspark/files.py
index 797573f..c08db41 100644
--- a/python/pyspark/files.py
+++ b/python/pyspark/files.py
@@ -24,8 +24,7 @@ __all__ = ['SparkFiles']
 class SparkFiles(object):
 
     """
-    Resolves paths to files added through
-    L{SparkContext.addFile()<pyspark.context.SparkContext.addFile>}.
+    Resolves paths to files added through :meth:`SparkContext.addFile`.
 
     SparkFiles contains only classmethods; users should not create SparkFiles
     instances.
@@ -41,7 +40,7 @@ class SparkFiles(object):
     @classmethod
     def get(cls, filename):
         """
-        Get the absolute path of a file added through 
C{SparkContext.addFile()}.
+        Get the absolute path of a file added through 
:meth:`SparkContext.addFile`.
         """
         path = os.path.join(SparkFiles.getRootDirectory(), filename)
         return os.path.abspath(path)
@@ -50,7 +49,7 @@ class SparkFiles(object):
     def getRootDirectory(cls):
         """
         Get the root directory that contains files added through
-        C{SparkContext.addFile()}.
+        :meth:`SparkContext.addFile`.
         """
         if cls._is_running_on_worker:
             return cls._root_directory
diff --git a/python/pyspark/ml/feature.py b/python/pyspark/ml/feature.py
index 9827a2a..78d0269 100755
--- a/python/pyspark/ml/feature.py
+++ b/python/pyspark/ml/feature.py
@@ -2560,7 +2560,7 @@ class IndexToString(JavaTransformer, HasInputCol, 
HasOutputCol, JavaMLReadable,
     corresponding string values.
     The index-string mapping is either from the ML attributes of the input 
column,
     or from user-supplied labels (which take precedence over ML attributes).
-    See L{StringIndexer} for converting strings into indices.
+    See :class:`StringIndexer` for converting strings into indices.
 
     .. versionadded:: 1.6.0
     """
diff --git a/python/pyspark/ml/linalg/__init__.py 
b/python/pyspark/ml/linalg/__init__.py
index f6ddc09..a79d5e5 100644
--- a/python/pyspark/ml/linalg/__init__.py
+++ b/python/pyspark/ml/linalg/__init__.py
@@ -17,9 +17,9 @@
 
 """
 MLlib utilities for linear algebra. For dense vectors, MLlib
-uses the NumPy C{array} type, so you can simply pass NumPy arrays
-around. For sparse vectors, users can construct a L{SparseVector}
-object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+uses the NumPy `array` type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a :class:`SparseVector`
+object from MLlib or pass SciPy `scipy.sparse` column vectors if
 SciPy is available in their environment.
 """
 
@@ -758,7 +758,7 @@ class Vectors(object):
     .. note:: Dense vectors are simply represented as NumPy array objects,
         so there is no need to covert them for use in MLlib. For sparse 
vectors,
         the factory methods in this class create an MLlib-compatible type, or 
users
-        can pass in SciPy's C{scipy.sparse} column vectors.
+        can pass in SciPy's `scipy.sparse` column vectors.
     """
 
     @staticmethod
diff --git a/python/pyspark/mllib/classification.py 
b/python/pyspark/mllib/classification.py
index d2037be..c52da2a 100644
--- a/python/pyspark/mllib/classification.py
+++ b/python/pyspark/mllib/classification.py
@@ -659,11 +659,11 @@ class NaiveBayes(object):
         Train a Naive Bayes model given an RDD of (label, features)
         vectors.
 
-        This is the Multinomial NB (U{http://tinyurl.com/lsdw6p}) which
+        This is the `Multinomial NB <http://tinyurl.com/lsdw6p>`_ which
         can handle all kinds of discrete data.  For example, by
         converting documents into TF-IDF vectors, it can be used for
         document classification. By making every vector a 0-1 vector,
-        it can also be used as Bernoulli NB (U{http://tinyurl.com/p7c96j6}).
+        it can also be used as `Bernoulli NB <http://tinyurl.com/p7c96j6>`_.
         The input feature values must be nonnegative.
 
         :param data:
diff --git a/python/pyspark/mllib/clustering.py 
b/python/pyspark/mllib/clustering.py
index 58da434..3524fcf 100644
--- a/python/pyspark/mllib/clustering.py
+++ b/python/pyspark/mllib/clustering.py
@@ -130,9 +130,9 @@ class BisectingKMeans(object):
     clusters, larger clusters get higher priority.
 
     Based on
-    U{http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf}
-    Steinbach, Karypis, and Kumar, A comparison of document clustering
-    techniques, KDD Workshop on Text Mining, 2000.
+    `Steinbach, Karypis, and Kumar, A comparison of document clustering
+    techniques, KDD Workshop on Text Mining, 2000
+    <http://glaros.dtc.umn.edu/gkhome/fetch/papers/docclusterKDDTMW00.pdf>`_.
 
     .. versionadded:: 2.0.0
     """
diff --git a/python/pyspark/mllib/linalg/__init__.py 
b/python/pyspark/mllib/linalg/__init__.py
index df411d7..cd09621 100644
--- a/python/pyspark/mllib/linalg/__init__.py
+++ b/python/pyspark/mllib/linalg/__init__.py
@@ -17,9 +17,9 @@
 
 """
 MLlib utilities for linear algebra. For dense vectors, MLlib
-uses the NumPy C{array} type, so you can simply pass NumPy arrays
-around. For sparse vectors, users can construct a L{SparseVector}
-object from MLlib or pass SciPy C{scipy.sparse} column vectors if
+uses the NumPy `array` type, so you can simply pass NumPy arrays
+around. For sparse vectors, users can construct a :class:`SparseVector`
+object from MLlib or pass SciPy `scipy.sparse` column vectors if
 SciPy is available in their environment.
 """
 
@@ -847,7 +847,7 @@ class Vectors(object):
     .. note:: Dense vectors are simply represented as NumPy array objects,
         so there is no need to covert them for use in MLlib. For sparse 
vectors,
         the factory methods in this class create an MLlib-compatible type, or 
users
-        can pass in SciPy's C{scipy.sparse} column vectors.
+        can pass in SciPy's `scipy.sparse` column vectors.
     """
 
     @staticmethod
diff --git a/python/pyspark/mllib/random.py b/python/pyspark/mllib/random.py
index a8833cb..6106c58 100644
--- a/python/pyspark/mllib/random.py
+++ b/python/pyspark/mllib/random.py
@@ -54,8 +54,7 @@ class RandomRDDs(object):
 
         To transform the distribution in the generated RDD from U(0.0, 1.0)
         to U(a, b), use
-        C{RandomRDDs.uniformRDD(sc, n, p, seed)\
-          .map(lambda v: a + (b - a) * v)}
+        ``RandomRDDs.uniformRDD(sc, n, p, seed).map(lambda v: a + (b - a) * 
v)``
 
         :param sc: SparkContext used to create the RDD.
         :param size: Size of the RDD.
@@ -85,8 +84,7 @@ class RandomRDDs(object):
 
         To transform the distribution in the generated RDD from standard normal
         to some other normal N(mean, sigma^2), use
-        C{RandomRDDs.normal(sc, n, p, seed)\
-          .map(lambda v: mean + sigma * v)}
+        ``RandomRDDs.normal(sc, n, p, seed).map(lambda v: mean + sigma * v)``
 
         :param sc: SparkContext used to create the RDD.
         :param size: Size of the RDD.
diff --git a/python/pyspark/mllib/stat/_statistics.py 
b/python/pyspark/mllib/stat/_statistics.py
index 6e89bfd..d49f741 100644
--- a/python/pyspark/mllib/stat/_statistics.py
+++ b/python/pyspark/mllib/stat/_statistics.py
@@ -98,10 +98,10 @@ class Statistics(object):
         """
         Compute the correlation (matrix) for the input RDD(s) using the
         specified method.
-        Methods currently supported: I{pearson (default), spearman}.
+        Methods currently supported: `pearson (default), spearman`.
 
         If a single RDD of Vectors is passed in, a correlation matrix
-        comparing the columns in the input RDD is returned. Use C{method=}
+        comparing the columns in the input RDD is returned. Use `method`
         to specify the method to be used for single RDD inout.
         If two RDDs of floats are passed in, a single float is returned.
 
diff --git a/python/pyspark/mllib/util.py b/python/pyspark/mllib/util.py
index 0190bf3..1a0ce42 100644
--- a/python/pyspark/mllib/util.py
+++ b/python/pyspark/mllib/util.py
@@ -95,7 +95,7 @@ class MLUtils(object):
                             which leads to inconsistent feature
                             dimensions.
         :param minPartitions: min number of partitions
-        @return: labeled data stored as an RDD of LabeledPoint
+        :return: labeled data stored as an RDD of LabeledPoint
 
         >>> from tempfile import NamedTemporaryFile
         >>> from pyspark.mllib.util import MLUtils
@@ -156,7 +156,7 @@ class MLUtils(object):
         :param path: file or directory path in any Hadoop-supported file
                      system URI
         :param minPartitions: min number of partitions
-        @return: labeled data stored as an RDD of LabeledPoint
+        :return: labeled data stored as an RDD of LabeledPoint
 
         >>> from tempfile import NamedTemporaryFile
         >>> from pyspark.mllib.util import MLUtils
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 8edb7f3..8bcc67a 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -286,13 +286,13 @@ class RDD(object):
     @property
     def context(self):
         """
-        The L{SparkContext} that this RDD was created on.
+        The :class:`SparkContext` that this RDD was created on.
         """
         return self.ctx
 
     def cache(self):
         """
-        Persist this RDD with the default storage level (C{MEMORY_ONLY}).
+        Persist this RDD with the default storage level (`MEMORY_ONLY`).
         """
         self.is_cached = True
         self.persist(StorageLevel.MEMORY_ONLY)
@@ -303,7 +303,7 @@ class RDD(object):
         Set this RDD's storage level to persist its values across operations
         after the first time it is computed. This can only be used to assign
         a new storage level if the RDD does not have a storage level set yet.
-        If no storage level is specified defaults to (C{MEMORY_ONLY}).
+        If no storage level is specified defaults to (`MEMORY_ONLY`).
 
         >>> rdd = sc.parallelize(["b", "a", "c"])
         >>> rdd.persist().is_cached
@@ -330,7 +330,7 @@ class RDD(object):
     def checkpoint(self):
         """
         Mark this RDD for checkpointing. It will be saved to a file inside the
-        checkpoint directory set with L{SparkContext.setCheckpointDir()} and
+        checkpoint directory set with :meth:`SparkContext.setCheckpointDir` and
         all references to its parent RDDs will be removed. This function must
         be called before any job has been executed on this RDD. It is strongly
         recommended that this RDD is persisted in memory, otherwise saving it
@@ -360,9 +360,9 @@ class RDD(object):
 
         This is NOT safe to use with dynamic allocation, which removes 
executors along
         with their cached blocks. If you must use both features, you are 
advised to set
-        L{spark.dynamicAllocation.cachedExecutorIdleTimeout} to a high value.
+        `spark.dynamicAllocation.cachedExecutorIdleTimeout` to a high value.
 
-        The checkpoint directory set through 
L{SparkContext.setCheckpointDir()} is not used.
+        The checkpoint directory set through 
:meth:`SparkContext.setCheckpointDir` is not used.
         """
         self._jrdd.rdd().localCheckpoint()
 
@@ -786,8 +786,8 @@ class RDD(object):
     def cartesian(self, other):
         """
         Return the Cartesian product of this RDD and another one, that is, the
-        RDD of all pairs of elements C{(a, b)} where C{a} is in C{self} and
-        C{b} is in C{other}.
+        RDD of all pairs of elements ``(a, b)`` where ``a`` is in `self` and
+        ``b`` is in `other`.
 
         >>> rdd = sc.parallelize([1, 2])
         >>> sorted(rdd.cartesian(rdd).collect())
@@ -960,9 +960,9 @@ class RDD(object):
         Aggregate the elements of each partition, and then the results for all
         the partitions, using a given associative function and a neutral "zero 
value."
 
-        The function C{op(t1, t2)} is allowed to modify C{t1} and return it
+        The function ``op(t1, t2)`` is allowed to modify ``t1`` and return it
         as its result value to avoid object allocation; however, it should not
-        modify C{t2}.
+        modify ``t2``.
 
         This behaves somewhat differently from fold operations implemented
         for non-distributed collections in functional languages like Scala.
@@ -995,9 +995,9 @@ class RDD(object):
         the partitions, using a given combine functions and a neutral "zero
         value."
 
-        The functions C{op(t1, t2)} is allowed to modify C{t1} and return it
+        The functions ``op(t1, t2)`` is allowed to modify ``t1`` and return it
         as its result value to avoid object allocation; however, it should not
-        modify C{t2}.
+        modify ``t2``.
 
         The first function (seqOp) can return a different result type, U, than
         the type of this RDD. Thus, we need one operation for merging a T into
@@ -1128,7 +1128,7 @@ class RDD(object):
 
     def stats(self):
         """
-        Return a L{StatCounter} object that captures the mean, variance
+        Return a :class:`StatCounter` object that captures the mean, variance
         and count of the RDD's elements in one operation.
         """
         def redFunc(left_counter, right_counter):
@@ -1467,10 +1467,10 @@ class RDD(object):
 
     def saveAsNewAPIHadoopDataset(self, conf, keyConverter=None, 
valueConverter=None):
         """
-        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to 
any Hadoop file
         system, using the new Hadoop OutputFormat API (mapreduce package). 
Keys/values are
         converted for output using either user specified converters or, by 
default,
-        L{org.apache.spark.api.python.JavaToWritableConverter}.
+        "org.apache.spark.api.python.JavaToWritableConverter".
 
         :param conf: Hadoop job configuration, passed in as a dict
         :param keyConverter: (None by default)
@@ -1484,11 +1484,11 @@ class RDD(object):
     def saveAsNewAPIHadoopFile(self, path, outputFormatClass, keyClass=None, 
valueClass=None,
                                keyConverter=None, valueConverter=None, 
conf=None):
         """
-        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to 
any Hadoop file
         system, using the new Hadoop OutputFormat API (mapreduce package). Key 
and value types
         will be inferred if not specified. Keys and values are converted for 
output using either
-        user specified converters or 
L{org.apache.spark.api.python.JavaToWritableConverter}. The
-        C{conf} is applied on top of the base Hadoop conf associated with the 
SparkContext
+        user specified converters or 
"org.apache.spark.api.python.JavaToWritableConverter". The
+        `conf` is applied on top of the base Hadoop conf associated with the 
SparkContext
         of this RDD to create a merged Hadoop MapReduce job configuration for 
saving the data.
 
         :param path: path to Hadoop file
@@ -1511,10 +1511,10 @@ class RDD(object):
 
     def saveAsHadoopDataset(self, conf, keyConverter=None, 
valueConverter=None):
         """
-        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to 
any Hadoop file
         system, using the old Hadoop OutputFormat API (mapred package). 
Keys/values are
         converted for output using either user specified converters or, by 
default,
-        L{org.apache.spark.api.python.JavaToWritableConverter}.
+        "org.apache.spark.api.python.JavaToWritableConverter".
 
         :param conf: Hadoop job configuration, passed in as a dict
         :param keyConverter: (None by default)
@@ -1529,11 +1529,11 @@ class RDD(object):
                          keyConverter=None, valueConverter=None, conf=None,
                          compressionCodecClass=None):
         """
-        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
+        Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to 
any Hadoop file
         system, using the old Hadoop OutputFormat API (mapred package). Key 
and value types
         will be inferred if not specified. Keys and values are converted for 
output using either
-        user specified converters or 
L{org.apache.spark.api.python.JavaToWritableConverter}. The
-        C{conf} is applied on top of the base Hadoop conf associated with the 
SparkContext
+        user specified converters or 
"org.apache.spark.api.python.JavaToWritableConverter". The
+        `conf` is applied on top of the base Hadoop conf associated with the 
SparkContext
         of this RDD to create a merged Hadoop MapReduce job configuration for 
saving the data.
 
         :param path: path to Hadoop file
@@ -1558,8 +1558,8 @@ class RDD(object):
 
     def saveAsSequenceFile(self, path, compressionCodecClass=None):
         """
-        Output a Python RDD of key-value pairs (of form C{RDD[(K, V)]}) to any 
Hadoop file
-        system, using the L{org.apache.hadoop.io.Writable} types that we 
convert from the
+        Output a Python RDD of key-value pairs (of form ``RDD[(K, V)]``) to 
any Hadoop file
+        system, using the "org.apache.hadoop.io.Writable" types that we 
convert from the
         RDD's key and value types. The mechanism is as follows:
 
             1. Pyrolite is used to convert pickled Python RDD into RDD of Java 
objects.
@@ -1575,7 +1575,7 @@ class RDD(object):
     def saveAsPickleFile(self, path, batchSize=10):
         """
         Save this RDD as a SequenceFile of serialized objects. The serializer
-        used is L{pyspark.serializers.PickleSerializer}, default batch size
+        used is :class:`pyspark.serializers.PickleSerializer`, default batch 
size
         is 10.
 
         >>> tmpFile = NamedTemporaryFile(delete=True)
@@ -1595,8 +1595,8 @@ class RDD(object):
         """
         Save this RDD as a text file, using string representations of elements.
 
-        @param path: path to text file
-        @param compressionCodecClass: (None by default) string i.e.
+        :param path: path to text file
+        :param compressionCodecClass: (None by default) string i.e.
             "org.apache.hadoop.io.compress.GzipCodec"
 
         >>> tempFile = NamedTemporaryFile(delete=True)
@@ -1685,8 +1685,8 @@ class RDD(object):
         This will also perform the merging locally on each mapper before
         sending results to a reducer, similarly to a "combiner" in MapReduce.
 
-        Output will be partitioned with C{numPartitions} partitions, or
-        the default parallelism level if C{numPartitions} is not specified.
+        Output will be partitioned with `numPartitions` partitions, or
+        the default parallelism level if `numPartitions` is not specified.
         Default partitioner is hash-partition.
 
         >>> from operator import add
@@ -1737,10 +1737,10 @@ class RDD(object):
     def join(self, other, numPartitions=None):
         """
         Return an RDD containing all pairs of elements with matching keys in
-        C{self} and C{other}.
+        `self` and `other`.
 
         Each pair of elements will be returned as a (k, (v1, v2)) tuple, where
-        (k, v1) is in C{self} and (k, v2) is in C{other}.
+        (k, v1) is in `self` and (k, v2) is in `other`.
 
         Performs a hash join across the cluster.
 
@@ -1753,11 +1753,11 @@ class RDD(object):
 
     def leftOuterJoin(self, other, numPartitions=None):
         """
-        Perform a left outer join of C{self} and C{other}.
+        Perform a left outer join of `self` and `other`.
 
-        For each element (k, v) in C{self}, the resulting RDD will either
-        contain all pairs (k, (v, w)) for w in C{other}, or the pair
-        (k, (v, None)) if no elements in C{other} have key k.
+        For each element (k, v) in `self`, the resulting RDD will either
+        contain all pairs (k, (v, w)) for w in `other`, or the pair
+        (k, (v, None)) if no elements in `other` have key k.
 
         Hash-partitions the resulting RDD into the given number of partitions.
 
@@ -1770,11 +1770,11 @@ class RDD(object):
 
     def rightOuterJoin(self, other, numPartitions=None):
         """
-        Perform a right outer join of C{self} and C{other}.
+        Perform a right outer join of `self` and `other`.
 
-        For each element (k, w) in C{other}, the resulting RDD will either
+        For each element (k, w) in `other`, the resulting RDD will either
         contain all pairs (k, (v, w)) for v in this, or the pair (k, (None, w))
-        if no elements in C{self} have key k.
+        if no elements in `self` have key k.
 
         Hash-partitions the resulting RDD into the given number of partitions.
 
@@ -1787,15 +1787,15 @@ class RDD(object):
 
     def fullOuterJoin(self, other, numPartitions=None):
         """
-        Perform a right outer join of C{self} and C{other}.
+        Perform a right outer join of `self` and `other`.
 
-        For each element (k, v) in C{self}, the resulting RDD will either
-        contain all pairs (k, (v, w)) for w in C{other}, or the pair
-        (k, (v, None)) if no elements in C{other} have key k.
+        For each element (k, v) in `self`, the resulting RDD will either
+        contain all pairs (k, (v, w)) for w in `other`, or the pair
+        (k, (v, None)) if no elements in `other` have key k.
 
-        Similarly, for each element (k, w) in C{other}, the resulting RDD will
-        either contain all pairs (k, (v, w)) for v in C{self}, or the pair
-        (k, (None, w)) if no elements in C{self} have key k.
+        Similarly, for each element (k, w) in `other`, the resulting RDD will
+        either contain all pairs (k, (v, w)) for v in `self`, or the pair
+        (k, (None, w)) if no elements in `self` have key k.
 
         Hash-partitions the resulting RDD into the given number of partitions.
 
@@ -1891,11 +1891,11 @@ class RDD(object):
 
         Users provide three functions:
 
-            - C{createCombiner}, which turns a V into a C (e.g., creates
+            - `createCombiner`, which turns a V into a C (e.g., creates
               a one-element list)
-            - C{mergeValue}, to merge a V into a C (e.g., adds it to the end of
+            - `mergeValue`, to merge a V into a C (e.g., adds it to the end of
               a list)
-            - C{mergeCombiners}, to combine two C's into a single one (e.g., 
merges
+            - `mergeCombiners`, to combine two C's into a single one (e.g., 
merges
               the lists)
 
         To avoid memory allocation, both mergeValue and mergeCombiners are 
allowed to
@@ -2072,9 +2072,9 @@ class RDD(object):
     # TODO: add variant with custom parittioner
     def cogroup(self, other, numPartitions=None):
         """
-        For each key k in C{self} or C{other}, return a resulting RDD that
-        contains a tuple with the list of values for that key in C{self} as
-        well as C{other}.
+        For each key k in `self` or `other`, return a resulting RDD that
+        contains a tuple with the list of values for that key in `self` as
+        well as `other`.
 
         >>> x = sc.parallelize([("a", 1), ("b", 4)])
         >>> y = sc.parallelize([("a", 2)])
@@ -2106,8 +2106,8 @@ class RDD(object):
 
     def subtractByKey(self, other, numPartitions=None):
         """
-        Return each (key, value) pair in C{self} that has no pair with matching
-        key in C{other}.
+        Return each (key, value) pair in `self` that has no pair with matching
+        key in `other`.
 
         >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 2)])
         >>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -2121,7 +2121,7 @@ class RDD(object):
 
     def subtract(self, other, numPartitions=None):
         """
-        Return each value in C{self} that is not contained in C{other}.
+        Return each value in `self` that is not contained in `other`.
 
         >>> x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
         >>> y = sc.parallelize([("a", 3), ("c", None)])
@@ -2134,7 +2134,7 @@ class RDD(object):
 
     def keyBy(self, f):
         """
-        Creates tuples of the elements in this RDD by applying C{f}.
+        Creates tuples of the elements in this RDD by applying `f`.
 
         >>> x = sc.parallelize(range(0,3)).keyBy(lambda x: x*x)
         >>> y = sc.parallelize(zip(range(0,5), range(0,5)))
@@ -2260,7 +2260,7 @@ class RDD(object):
         Items in the kth partition will get ids k, n+k, 2*n+k, ..., where
         n is the number of partitions. So there may exist gaps, but this
         method won't trigger a spark job, which is different from
-        L{zipWithIndex}
+        :meth:`zipWithIndex`.
 
         >>> sc.parallelize(["a", "b", "c", "d", "e"], 
3).zipWithUniqueId().collect()
         [('a', 0), ('b', 1), ('c', 4), ('d', 2), ('e', 5)]
diff --git a/python/pyspark/serializers.py b/python/pyspark/serializers.py
index ddca2a7..00f6081 100644
--- a/python/pyspark/serializers.py
+++ b/python/pyspark/serializers.py
@@ -19,12 +19,12 @@
 PySpark supports custom serializers for transferring data; this can improve
 performance.
 
-By default, PySpark uses L{PickleSerializer} to serialize objects using 
Python's
-C{cPickle} serializer, which can serialize nearly any Python object.
-Other serializers, like L{MarshalSerializer}, support fewer datatypes but can 
be
+By default, PySpark uses :class:`PickleSerializer` to serialize objects using 
Python's
+`cPickle` serializer, which can serialize nearly any Python object.
+Other serializers, like :class:`MarshalSerializer`, support fewer datatypes 
but can be
 faster.
 
-The serializer is chosen when creating L{SparkContext}:
+The serializer is chosen when creating :class:`SparkContext`:
 
 >>> from pyspark.context import SparkContext
 >>> from pyspark.serializers import MarshalSerializer
@@ -34,7 +34,7 @@ The serializer is chosen when creating L{SparkContext}:
 >>> sc.stop()
 
 PySpark serializes objects in batches; by default, the batch size is chosen 
based
-on the size of objects and is also configurable by SparkContext's C{batchSize}
+on the size of objects and is also configurable by SparkContext's `batchSize`
 parameter:
 
 >>> sc = SparkContext('local', 'test', batchSize=2)
@@ -129,7 +129,7 @@ class FramedSerializer(Serializer):
 
     """
     Serializer that writes objects as a stream of (length, data) pairs,
-    where C{length} is a 32-bit integer and data is C{length} bytes.
+    where `length` is a 32-bit integer and data is `length` bytes.
     """
 
     def __init__(self):
diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py
index e666973..87d4b81 100644
--- a/python/pyspark/sql/dataframe.py
+++ b/python/pyspark/sql/dataframe.py
@@ -408,7 +408,7 @@ class DataFrame(object):
         """Returns a checkpointed version of this Dataset. Checkpointing can 
be used to truncate the
         logical plan of this DataFrame, which is especially useful in 
iterative algorithms where the
         plan may grow exponentially. It will be saved to files inside the 
checkpoint
-        directory set with L{SparkContext.setCheckpointDir()}.
+        directory set with :meth:`SparkContext.setCheckpointDir`.
 
         :param eager: Whether to checkpoint this DataFrame immediately
 
@@ -581,9 +581,9 @@ class DataFrame(object):
 
     @since(1.3)
     def cache(self):
-        """Persists the :class:`DataFrame` with the default storage level 
(C{MEMORY_AND_DISK}).
+        """Persists the :class:`DataFrame` with the default storage level 
(`MEMORY_AND_DISK`).
 
-        .. note:: The default storage level has changed to C{MEMORY_AND_DISK} 
to match Scala in 2.0.
+        .. note:: The default storage level has changed to `MEMORY_AND_DISK` 
to match Scala in 2.0.
         """
         self.is_cached = True
         self._jdf.cache()
@@ -594,9 +594,9 @@ class DataFrame(object):
         """Sets the storage level to persist the contents of the 
:class:`DataFrame` across
         operations after the first time it is computed. This can only be used 
to assign
         a new storage level if the :class:`DataFrame` does not have a storage 
level set yet.
-        If no storage level is specified defaults to (C{MEMORY_AND_DISK}).
+        If no storage level is specified defaults to (`MEMORY_AND_DISK`).
 
-        .. note:: The default storage level has changed to C{MEMORY_AND_DISK} 
to match Scala in 2.0.
+        .. note:: The default storage level has changed to `MEMORY_AND_DISK` 
to match Scala in 2.0.
         """
         self.is_cached = True
         javaStorageLevel = self._sc._getJavaStorageLevel(storageLevel)
diff --git a/python/pyspark/sql/types.py b/python/pyspark/sql/types.py
index f9b12f1..da84fc1 100644
--- a/python/pyspark/sql/types.py
+++ b/python/pyspark/sql/types.py
@@ -1405,7 +1405,7 @@ def _create_row(fields, values):
 class Row(tuple):
 
     """
-    A row in L{DataFrame}.
+    A row in :class:`DataFrame`.
     The fields in it can be accessed:
 
     * like attributes (``row.key``)
diff --git a/python/pyspark/streaming/context.py 
b/python/pyspark/streaming/context.py
index 6fbe26b6..769121c 100644
--- a/python/pyspark/streaming/context.py
+++ b/python/pyspark/streaming/context.py
@@ -33,7 +33,7 @@ class StreamingContext(object):
     """
     Main entry point for Spark Streaming functionality. A StreamingContext
     represents the connection to a Spark cluster, and can be used to create
-    L{DStream} various input sources. It can be from an existing 
L{SparkContext}.
+    :class:`DStream` various input sources. It can be from an existing 
:class:`SparkContext`.
     After creating and transforming DStreams, the streaming computation can
     be started and stopped using `context.start()` and `context.stop()`,
     respectively. `context.awaitTermination()` allows the current thread
@@ -48,8 +48,8 @@ class StreamingContext(object):
         """
         Create a new StreamingContext.
 
-        @param sparkContext: L{SparkContext} object.
-        @param batchDuration: the time interval (in seconds) at which streaming
+        :param sparkContext: :class:`SparkContext` object.
+        :param batchDuration: the time interval (in seconds) at which streaming
                               data will be divided into batches
         """
 
@@ -92,8 +92,8 @@ class StreamingContext(object):
         recreated from the checkpoint data. If the data does not exist, then 
the provided setupFunc
         will be used to create a new context.
 
-        @param checkpointPath: Checkpoint directory used in an earlier 
streaming program
-        @param setupFunc:      Function to create a new context and setup 
DStreams
+        :param checkpointPath: Checkpoint directory used in an earlier 
streaming program
+        :param setupFunc:      Function to create a new context and setup 
DStreams
         """
         cls._ensure_initialized()
         gw = SparkContext._gateway
@@ -149,10 +149,10 @@ class StreamingContext(object):
         valid checkpoint data, then setupFunc will be called to create a new 
context and setup
         DStreams.
 
-        @param checkpointPath: Checkpoint directory used in an earlier 
streaming program. Can be
+        :param checkpointPath: Checkpoint directory used in an earlier 
streaming program. Can be
                                None if the intention is to always create a new 
context when there
                                is no active context.
-        @param setupFunc:      Function to create a new JavaStreamingContext 
and setup DStreams
+        :param setupFunc:      Function to create a new JavaStreamingContext 
and setup DStreams
         """
 
         if setupFunc is None:
@@ -183,7 +183,7 @@ class StreamingContext(object):
         """
         Wait for the execution to stop.
 
-        @param timeout: time to wait in seconds
+        :param timeout: time to wait in seconds
         """
         if timeout is None:
             self._jssc.awaitTermination()
@@ -196,7 +196,7 @@ class StreamingContext(object):
         throw the reported error during the execution; or `false` if the
         waiting time elapsed before returning from the method.
 
-        @param timeout: time to wait in seconds
+        :param timeout: time to wait in seconds
         """
         return self._jssc.awaitTerminationOrTimeout(int(timeout * 1000))
 
@@ -205,8 +205,8 @@ class StreamingContext(object):
         Stop the execution of the streams, with option of ensuring all
         received data has been processed.
 
-        @param stopSparkContext: Stop the associated SparkContext or not
-        @param stopGracefully: Stop gracefully by waiting for the processing
+        :param stopSparkContext: Stop the associated SparkContext or not
+        :param stopGracefully: Stop gracefully by waiting for the processing
                               of all received data to be completed
         """
         self._jssc.stop(stopSparkContext, stopGraceFully)
@@ -223,7 +223,7 @@ class StreamingContext(object):
         the RDDs (if the developer wishes to query old data outside the
         DStream computation).
 
-        @param duration: Minimum duration (in seconds) that each DStream
+        :param duration: Minimum duration (in seconds) that each DStream
                         should remember its RDDs
         """
         self._jssc.remember(self._jduration(duration))
@@ -233,7 +233,7 @@ class StreamingContext(object):
         Sets the context to periodically checkpoint the DStream operations for 
master
         fault-tolerance. The graph will be checkpointed every batch interval.
 
-        @param directory: HDFS-compatible directory where the checkpoint data
+        :param directory: HDFS-compatible directory where the checkpoint data
                          will be reliably stored
         """
         self._jssc.checkpoint(directory)
@@ -244,9 +244,9 @@ class StreamingContext(object):
         a TCP socket and receive byte is interpreted as UTF8 encoded ``\\n`` 
delimited
         lines.
 
-        @param hostname:      Hostname to connect to for receiving data
-        @param port:          Port to connect to for receiving data
-        @param storageLevel:  Storage level to use for storing the received 
objects
+        :param hostname:      Hostname to connect to for receiving data
+        :param port:          Port to connect to for receiving data
+        :param storageLevel:  Storage level to use for storing the received 
objects
         """
         jlevel = self._sc._getJavaStorageLevel(storageLevel)
         return DStream(self._jssc.socketTextStream(hostname, port, jlevel), 
self,
@@ -270,8 +270,8 @@ class StreamingContext(object):
         them from another location within the same file system.
         File names starting with . are ignored.
 
-        @param directory:       Directory to load data from
-        @param recordLength:    Length of each record in bytes
+        :param directory:       Directory to load data from
+        :param recordLength:    Length of each record in bytes
         """
         return DStream(self._jssc.binaryRecordsStream(directory, 
recordLength), self,
                        NoOpSerializer())
@@ -290,9 +290,9 @@ class StreamingContext(object):
 
         .. note:: Changes to the queue after the stream is created will not be 
recognized.
 
-        @param rdds:       Queue of RDDs
-        @param oneAtATime: pick one rdd each time or pick all of them once.
-        @param default:    The default rdd if no more in rdds
+        :param rdds:       Queue of RDDs
+        :param oneAtATime: pick one rdd each time or pick all of them once.
+        :param default:    The default rdd if no more in rdds
         """
         if default and not isinstance(default, RDD):
             default = self._sc.parallelize(default)
diff --git a/python/pyspark/streaming/dstream.py 
b/python/pyspark/streaming/dstream.py
index c253e5c..60562a6 100644
--- a/python/pyspark/streaming/dstream.py
+++ b/python/pyspark/streaming/dstream.py
@@ -41,11 +41,11 @@ class DStream(object):
     """
     A Discretized Stream (DStream), the basic abstraction in Spark Streaming,
     is a continuous sequence of RDDs (of the same type) representing a
-    continuous stream of data (see L{RDD} in the Spark core documentation
+    continuous stream of data (see :class:`RDD` in the Spark core documentation
     for more details on RDDs).
 
     DStreams can either be created from live data (such as, data from TCP
-    sockets, etc.) using a L{StreamingContext} or it can be
+    sockets, etc.) using a :class:`StreamingContext` or it can be
     generated by transforming existing DStreams using operations such as
     `map`, `window` and `reduceByKeyAndWindow`. While a Spark Streaming
     program is running, each DStream periodically generates a RDD, either
@@ -167,7 +167,7 @@ class DStream(object):
         """
         Print the first num elements of each RDD generated in this DStream.
 
-        @param num: the number of elements from the first will be printed.
+        :param num: the number of elements from the first will be printed.
         """
         def takeAndPrint(time, rdd):
             taken = rdd.take(num + 1)
@@ -210,7 +210,7 @@ class DStream(object):
     def cache(self):
         """
         Persist the RDDs of this DStream with the default storage level
-        (C{MEMORY_ONLY}).
+        (`MEMORY_ONLY`).
         """
         self.is_cached = True
         self.persist(StorageLevel.MEMORY_ONLY)
@@ -229,7 +229,7 @@ class DStream(object):
         """
         Enable periodic checkpointing of RDDs of this DStream
 
-        @param interval: time in seconds, after each period of that, generated
+        :param interval: time in seconds, after each period of that, generated
                          RDD will be checkpointed
         """
         self.is_checkpointed = True
@@ -333,7 +333,7 @@ class DStream(object):
         """
         Return a new DStream by unifying data of another DStream with this 
DStream.
 
-        @param other: Another DStream having the same interval (i.e., 
slideDuration)
+        :param other: Another DStream having the same interval (i.e., 
slideDuration)
                      as this DStream.
         """
         if self._slideDuration != other._slideDuration:
@@ -429,9 +429,9 @@ class DStream(object):
         Return a new DStream in which each RDD contains all the elements in 
seen in a
         sliding window of time over this DStream.
 
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
+        :param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
+        :param slideDuration:  sliding interval of the window (i.e., the 
interval after which
                               the new DStream will generate RDDs); must be a 
multiple of this
                               DStream's batching interval
         """
@@ -455,13 +455,13 @@ class DStream(object):
         2. "inverse reduce" the old values that left the window (e.g., 
subtracting old counts)
         This is more efficient than `invReduceFunc` is None.
 
-        @param reduceFunc:     associative and commutative reduce function
-        @param invReduceFunc:  inverse reduce function of `reduceFunc`; such 
that for all y,
+        :param reduceFunc:     associative and commutative reduce function
+        :param invReduceFunc:  inverse reduce function of `reduceFunc`; such 
that for all y,
                                and invertible x:
                                `invReduceFunc(reduceFunc(x, y), x) = y`
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
+        :param windowDuration: width of the window; must be a multiple of this 
DStream's
                                batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
+        :param slideDuration:  sliding interval of the window (i.e., the 
interval after which
                                the new DStream will generate RDDs); must be a 
multiple of this
                                DStream's batching interval
         """
@@ -487,12 +487,12 @@ class DStream(object):
         Return a new DStream in which each RDD contains the count of distinct 
elements in
         RDDs in a sliding window over this DStream.
 
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
+        :param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
+        :param slideDuration:  sliding interval of the window (i.e., the 
interval after which
                               the new DStream will generate RDDs); must be a 
multiple of this
                               DStream's batching interval
-        @param numPartitions:  number of partitions of each RDD in the new 
DStream.
+        :param numPartitions:  number of partitions of each RDD in the new 
DStream.
         """
         keyed = self.map(lambda x: (x, 1))
         counted = keyed.reduceByKeyAndWindow(operator.add, operator.sub,
@@ -504,12 +504,12 @@ class DStream(object):
         Return a new DStream by applying `groupByKey` over a sliding window.
         Similar to `DStream.groupByKey()`, but applies it over a sliding 
window.
 
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
+        :param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
+        :param slideDuration:  sliding interval of the window (i.e., the 
interval after which
                               the new DStream will generate RDDs); must be a 
multiple of this
                               DStream's batching interval
-        @param numPartitions:  Number of partitions of each RDD in the new 
DStream.
+        :param numPartitions:  Number of partitions of each RDD in the new 
DStream.
         """
         ls = self.mapValues(lambda x: [x])
         grouped = ls.reduceByKeyAndWindow(lambda a, b: a.extend(b) or a, 
lambda a, b: a[len(b):],
@@ -528,15 +528,15 @@ class DStream(object):
         `invFunc` can be None, then it will reduce all the RDDs in window, 
could be slower
         than having `invFunc`.
 
-        @param func:           associative and commutative reduce function
-        @param invFunc:        inverse function of `reduceFunc`
-        @param windowDuration: width of the window; must be a multiple of this 
DStream's
+        :param func:           associative and commutative reduce function
+        :param invFunc:        inverse function of `reduceFunc`
+        :param windowDuration: width of the window; must be a multiple of this 
DStream's
                               batching interval
-        @param slideDuration:  sliding interval of the window (i.e., the 
interval after which
+        :param slideDuration:  sliding interval of the window (i.e., the 
interval after which
                               the new DStream will generate RDDs); must be a 
multiple of this
                               DStream's batching interval
-        @param numPartitions:  number of partitions of each RDD in the new 
DStream.
-        @param filterFunc:     function to filter expired key-value pairs;
+        :param numPartitions:  number of partitions of each RDD in the new 
DStream.
+        :param filterFunc:     function to filter expired key-value pairs;
                               only pairs that satisfy the function are retained
                               set this to null if you do not want to filter
         """
@@ -578,7 +578,7 @@ class DStream(object):
         Return a new "state" DStream where the state for each key is updated 
by applying
         the given function on the previous state of the key and the new values 
of the key.
 
-        @param updateFunc: State update function. If this function returns 
None, then
+        :param updateFunc: State update function. If this function returns 
None, then
                            corresponding state key-value pair will be 
eliminated.
         """
         if numPartitions is None:
diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py
index dff5e18..6d28491 100644
--- a/python/pyspark/taskcontext.py
+++ b/python/pyspark/taskcontext.py
@@ -28,7 +28,7 @@ class TaskContext(object):
 
     Contextual information about a task which can be read or mutated during
     execution. To access the TaskContext for a running task, use:
-    L{TaskContext.get()}.
+    :meth:`TaskContext.get`.
     """
 
     _taskContext = None
diff --git a/python/pyspark/testing/streamingutils.py 
b/python/pyspark/testing/streamingutils.py
index 4c27f8a..a6abc2e 100644
--- a/python/pyspark/testing/streamingutils.py
+++ b/python/pyspark/testing/streamingutils.py
@@ -137,9 +137,9 @@ class PySparkStreamingTestCase(unittest.TestCase):
 
     def _test_func(self, input, func, expected, sort=False, input2=None):
         """
-        @param input: dataset for the test. This should be list of lists.
-        @param func: wrapped function. This function should return 
PythonDStream object.
-        @param expected: expected output for this testcase.
+        :param input: dataset for the test. This should be list of lists.
+        :param func: wrapped function. This function should return 
PythonDStream object.
+        :param expected: expected output for this testcase.
         """
         if not isinstance(input[0], RDD):
             input = [self.sc.parallelize(d, 1) for d in input]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to