This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 356623add8d [SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast 
examples self-contained
356623add8d is described below

commit 356623add8d5fd47394b1018fe46fe3c0cb9f814
Author: Qian.Sun <qian.sun2...@gmail.com>
AuthorDate: Tue Aug 30 11:10:37 2022 +0900

    [SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast examples self-contained
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to improve the examples in `pyspark.broadcast` by making 
each example self-contained with a brief explanation and a bit more realistic 
example.
    
    ### Why are the changes needed?
    
    To make the documentation more readable and able to copy and paste directly 
in PySpark shell.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, it changes the documentation.
    
    ### How was this patch tested?
    
    Manually ran each doctest.
    
    Closes #37629 from dcoliversun/SPARK-40160.
    
    Authored-by: Qian.Sun <qian.sun2...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 python/pyspark/broadcast.py | 138 ++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 126 insertions(+), 12 deletions(-)

diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py
index edd282de92f..c163ad2eb77 100644
--- a/python/pyspark/broadcast.py
+++ b/python/pyspark/broadcast.py
@@ -70,16 +70,14 @@ class Broadcast(Generic[T]):
 
     Examples
     --------
-    >>> from pyspark.context import SparkContext
-    >>> sc = SparkContext('local', 'test')
-    >>> b = sc.broadcast([1, 2, 3, 4, 5])
+    >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
     >>> b.value
     [1, 2, 3, 4, 5]
-    >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect()
+    >>> spark.sparkContext.parallelize([0, 0]).flatMap(lambda x: 
b.value).collect()
     [1, 2, 3, 4, 5, 1, 2, 3, 4, 5]
     >>> b.unpersist()
 
-    >>> large_broadcast = sc.broadcast(range(10000))
+    >>> large_broadcast = spark.sparkContext.broadcast(range(10000))
     """
 
     @overload  # On driver
@@ -149,6 +147,32 @@ class Broadcast(Generic[T]):
                 self._path = path
 
     def dump(self, value: T, f: BinaryIO) -> None:
+        """
+        Write a pickled representation of value to the open file or socket.
+        The protocol pickle is HIGHEST_PROTOCOL.
+
+        Parameters
+        ----------
+        value : T
+            Value to write.
+
+        f : :class:`BinaryIO`
+            File or socket where the pickled value will be stored.
+
+        Examples
+        --------
+        >>> import os
+        >>> import tempfile
+
+        >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+        Write a pickled representation of `b` to the open temp file.
+
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     path = os.path.join(d, "test.txt")
+        ...     with open(path, "wb") as f:
+        ...         b.dump(b.value, f)
+        """
         try:
             pickle.dump(value, f, pickle_protocol)
         except pickle.PickleError:
@@ -160,11 +184,74 @@ class Broadcast(Generic[T]):
         f.close()
 
     def load_from_path(self, path: str) -> T:
+        """
+        Read the pickled representation of an object from the open file and
+        return the reconstituted object hierarchy specified therein.
+
+        Parameters
+        ----------
+        path : str
+            File path where reads the pickled value.
+
+        Returns
+        -------
+        T
+            The object hierarchy specified therein reconstituted
+            from the pickled representation of an object.
+
+        Examples
+        --------
+        >>> import os
+        >>> import tempfile
+
+        >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+        >>> c = spark.sparkContext.broadcast(1)
+
+        Read the pickled representation of value fron temp file.
+
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     path = os.path.join(d, "test.txt")
+        ...     with open(path, "wb") as f:
+        ...         b.dump(b.value, f)
+        ...     c.load_from_path(path)
+        [1, 2, 3, 4, 5]
+        """
         with open(path, "rb", 1 << 20) as f:
             return self.load(f)
 
     def load(self, file: BinaryIO) -> T:
-        # "file" could also be a socket
+        """
+        Read a pickled representation of value from the open file or socket.
+
+        Parameters
+        ----------
+        file : :class:`BinaryIO`
+            File or socket where the pickled value will be read.
+
+        Returns
+        -------
+        T
+            The object hierarchy specified therein reconstituted
+            from the pickled representation of an object.
+
+        Examples
+        --------
+        >>> import os
+        >>> import tempfile
+
+        >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+        >>> c = spark.sparkContext.broadcast(1)
+
+        Read the pickled representation of value from the open temp file.
+
+        >>> with tempfile.TemporaryDirectory() as d:
+        ...     path = os.path.join(d, "test.txt")
+        ...     with open(path, "wb") as f:
+        ...         b.dump(b.value, f)
+        ...     with open(path, "rb") as f:
+        ...         c.load(f)
+        [1, 2, 3, 4, 5]
+        """
         gc.disable()
         try:
             return pickle.load(file)
@@ -194,8 +281,16 @@ class Broadcast(Generic[T]):
 
         Parameters
         ----------
-        blocking : bool, optional
-            Whether to block until unpersisting has completed
+        blocking : bool, optional, default False
+            Whether to block until unpersisting has completed.
+
+        Examples
+        --------
+        >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+        Delete cached copies of this broadcast on the executors
+
+        >>> b.unpersist()
         """
         if self._jbroadcast is None:
             raise RuntimeError("Broadcast can only be unpersisted in driver")
@@ -213,8 +308,16 @@ class Broadcast(Generic[T]):
 
         Parameters
         ----------
-        blocking : bool, optional
-            Whether to block until unpersisting has completed
+        blocking : bool, optional, default False
+            Whether to block until unpersisting has completed.
+
+        Examples
+        --------
+        >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5])
+
+        Destroy all data and metadata related to this broadcast variable
+
+        >>> b.destroy()
         """
         if self._jbroadcast is None:
             raise RuntimeError("Broadcast can only be destroyed in driver")
@@ -246,9 +349,20 @@ class BroadcastPickleRegistry(threading.local):
         self._registry.clear()
 
 
-if __name__ == "__main__":
+def _test() -> None:
     import doctest
+    from pyspark.sql import SparkSession
+    import pyspark.broadcast
+
+    globs = pyspark.broadcast.__dict__.copy()
+    spark = SparkSession.builder.master("local[4]").appName("broadcast 
tests").getOrCreate()
+    globs["spark"] = spark
 
-    (failure_count, test_count) = doctest.testmod()
+    (failure_count, test_count) = doctest.testmod(pyspark.broadcast, 
globs=globs)
+    spark.stop()
     if failure_count:
         sys.exit(-1)
+
+
+if __name__ == "__main__":
+    _test()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to