This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 356623add8d [SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast examples self-contained 356623add8d is described below commit 356623add8d5fd47394b1018fe46fe3c0cb9f814 Author: Qian.Sun <qian.sun2...@gmail.com> AuthorDate: Tue Aug 30 11:10:37 2022 +0900 [SPARK-40160][PYTHON][DOCS] Make pyspark.broadcast examples self-contained ### What changes were proposed in this pull request? This PR proposes to improve the examples in `pyspark.broadcast` by making each example self-contained with a brief explanation and a bit more realistic example. ### Why are the changes needed? To make the documentation more readable and able to copy and paste directly in PySpark shell. ### Does this PR introduce _any_ user-facing change? Yes, it changes the documentation. ### How was this patch tested? Manually ran each doctest. Closes #37629 from dcoliversun/SPARK-40160. Authored-by: Qian.Sun <qian.sun2...@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/broadcast.py | 138 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 126 insertions(+), 12 deletions(-) diff --git a/python/pyspark/broadcast.py b/python/pyspark/broadcast.py index edd282de92f..c163ad2eb77 100644 --- a/python/pyspark/broadcast.py +++ b/python/pyspark/broadcast.py @@ -70,16 +70,14 @@ class Broadcast(Generic[T]): Examples -------- - >>> from pyspark.context import SparkContext - >>> sc = SparkContext('local', 'test') - >>> b = sc.broadcast([1, 2, 3, 4, 5]) + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) >>> b.value [1, 2, 3, 4, 5] - >>> sc.parallelize([0, 0]).flatMap(lambda x: b.value).collect() + >>> spark.sparkContext.parallelize([0, 0]).flatMap(lambda x: b.value).collect() [1, 2, 3, 4, 5, 1, 2, 3, 4, 5] >>> b.unpersist() - >>> large_broadcast = sc.broadcast(range(10000)) + >>> large_broadcast = spark.sparkContext.broadcast(range(10000)) """ @overload # On driver @@ -149,6 +147,32 @@ class Broadcast(Generic[T]): self._path = path def dump(self, value: T, f: BinaryIO) -> None: + """ + Write a pickled representation of value to the open file or socket. + The protocol pickle is HIGHEST_PROTOCOL. + + Parameters + ---------- + value : T + Value to write. + + f : :class:`BinaryIO` + File or socket where the pickled value will be stored. + + Examples + -------- + >>> import os + >>> import tempfile + + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) + + Write a pickled representation of `b` to the open temp file. + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "test.txt") + ... with open(path, "wb") as f: + ... b.dump(b.value, f) + """ try: pickle.dump(value, f, pickle_protocol) except pickle.PickleError: @@ -160,11 +184,74 @@ class Broadcast(Generic[T]): f.close() def load_from_path(self, path: str) -> T: + """ + Read the pickled representation of an object from the open file and + return the reconstituted object hierarchy specified therein. + + Parameters + ---------- + path : str + File path where reads the pickled value. + + Returns + ------- + T + The object hierarchy specified therein reconstituted + from the pickled representation of an object. + + Examples + -------- + >>> import os + >>> import tempfile + + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) + >>> c = spark.sparkContext.broadcast(1) + + Read the pickled representation of value fron temp file. + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "test.txt") + ... with open(path, "wb") as f: + ... b.dump(b.value, f) + ... c.load_from_path(path) + [1, 2, 3, 4, 5] + """ with open(path, "rb", 1 << 20) as f: return self.load(f) def load(self, file: BinaryIO) -> T: - # "file" could also be a socket + """ + Read a pickled representation of value from the open file or socket. + + Parameters + ---------- + file : :class:`BinaryIO` + File or socket where the pickled value will be read. + + Returns + ------- + T + The object hierarchy specified therein reconstituted + from the pickled representation of an object. + + Examples + -------- + >>> import os + >>> import tempfile + + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) + >>> c = spark.sparkContext.broadcast(1) + + Read the pickled representation of value from the open temp file. + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "test.txt") + ... with open(path, "wb") as f: + ... b.dump(b.value, f) + ... with open(path, "rb") as f: + ... c.load(f) + [1, 2, 3, 4, 5] + """ gc.disable() try: return pickle.load(file) @@ -194,8 +281,16 @@ class Broadcast(Generic[T]): Parameters ---------- - blocking : bool, optional - Whether to block until unpersisting has completed + blocking : bool, optional, default False + Whether to block until unpersisting has completed. + + Examples + -------- + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) + + Delete cached copies of this broadcast on the executors + + >>> b.unpersist() """ if self._jbroadcast is None: raise RuntimeError("Broadcast can only be unpersisted in driver") @@ -213,8 +308,16 @@ class Broadcast(Generic[T]): Parameters ---------- - blocking : bool, optional - Whether to block until unpersisting has completed + blocking : bool, optional, default False + Whether to block until unpersisting has completed. + + Examples + -------- + >>> b = spark.sparkContext.broadcast([1, 2, 3, 4, 5]) + + Destroy all data and metadata related to this broadcast variable + + >>> b.destroy() """ if self._jbroadcast is None: raise RuntimeError("Broadcast can only be destroyed in driver") @@ -246,9 +349,20 @@ class BroadcastPickleRegistry(threading.local): self._registry.clear() -if __name__ == "__main__": +def _test() -> None: import doctest + from pyspark.sql import SparkSession + import pyspark.broadcast + + globs = pyspark.broadcast.__dict__.copy() + spark = SparkSession.builder.master("local[4]").appName("broadcast tests").getOrCreate() + globs["spark"] = spark - (failure_count, test_count) = doctest.testmod() + (failure_count, test_count) = doctest.testmod(pyspark.broadcast, globs=globs) + spark.stop() if failure_count: sys.exit(-1) + + +if __name__ == "__main__": + _test() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org