This is an automated email from the ASF dual-hosted git repository. gurwls223 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 10e600f04ae [SPARK-40077][PYTHON][DOCS] Make pyspark.context examples self-contained 10e600f04ae is described below commit 10e600f04ae194b7857c9822926784b88b160f10 Author: Ruifeng Zheng <ruife...@apache.org> AuthorDate: Tue Aug 16 09:37:28 2022 +0900 [SPARK-40077][PYTHON][DOCS] Make pyspark.context examples self-contained ### What changes were proposed in this pull request? 1, Make pyspark.context examples self-contained 2, add missing `versionadded` comments 3, add `see-also` sections ### Why are the changes needed? To make the documentation more readable and able to copy and paste directly in PySpark shell. ### Does this PR introduce _any_ user-facing change? documents were changed ### How was this patch tested? - added doctests. - manually copy-paste test the example in PySpark Shell. - build the documents and manually checks Closes #37517 from zhengruifeng/py_doc_sc_self_contained. Lead-authored-by: Ruifeng Zheng <ruife...@apache.org> Co-authored-by: Ruifeng Zheng <ruife...@foxmail.com> Signed-off-by: Hyukjin Kwon <gurwls...@apache.org> --- python/pyspark/context.py | 1028 +++++++++++++++++++++++++++++++++++++++------ 1 file changed, 903 insertions(+), 125 deletions(-) diff --git a/python/pyspark/context.py b/python/pyspark/context.py index 032efaef492..67ac8c720cd 100644 --- a/python/pyspark/context.py +++ b/python/pyspark/context.py @@ -45,7 +45,7 @@ from typing import ( from py4j.java_collections import JavaMap from py4j.protocol import Py4JError -from pyspark import accumulators, since +from pyspark import accumulators from pyspark.accumulators import Accumulator from pyspark.broadcast import Broadcast, BroadcastPickleRegistry from pyspark.conf import SparkConf @@ -112,26 +112,24 @@ class SparkContext: environment : dict, optional A dictionary of environment variables to set on worker nodes. - batchSize : int, optional + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. Set 1 to disable batching, 0 to automatically choose the batch size based on object sizes, or -1 to use an unlimited batch size - serializer : :class:`pyspark.serializers.Serializer`, optional + serializer : :class:`Serializer`, optional, default :class:`CPickleSerializer` The serializer for RDDs. - conf : :py:class:`pyspark.SparkConf`, optional + conf : :class:`SparkConf`, optional An object setting Spark properties. - gateway : :py:class:`py4j.java_gateway.JavaGateway`, optional + gateway : class:`py4j.java_gateway.JavaGateway`, optional Use an existing gateway and JVM, otherwise a new JVM will be instantiated. This is only used internally. - jsc : :py:class:`py4j.java_gateway.JavaObject`, optional + jsc : class:`py4j.java_gateway.JavaObject`, optional The JavaSparkContext instance. This is only used internally. - profiler_cls : type, optional + profiler_cls : type, optional, default :class:`BasicProfiler` A class of custom Profiler used to do profiling - (default is :class:`pyspark.profiler.BasicProfiler`). - udf_profiler_cls : type, optional + udf_profiler_cls : type, optional, default :class:`UDFBasicProfiler` A class of custom Profiler used to do udf profiling - (default is :class:`pyspark.profiler.UDFBasicProfiler`). Notes ----- @@ -477,11 +475,25 @@ class SparkContext: @classmethod def getOrCreate(cls, conf: Optional[SparkConf] = None) -> "SparkContext": """ - Get or instantiate a SparkContext and register it as a singleton object. + Get or instantiate a :class:`SparkContext` and register it as a singleton object. + + .. versionadded:: 1.4.0 Parameters ---------- - conf : :py:class:`pyspark.SparkConf`, optional + conf : :class:`SparkConf`, optional + :class:`SparkConf` that will be used for initialization of the :class:`SparkContext`. + + Returns + ------- + :class:`SparkContext` + current :class:`SparkContext`, or a new one if it wasn't created before the function + call. + + Examples + -------- + >>> SparkContext.getOrCreate() + <SparkContext ...> """ with SparkContext._lock: if SparkContext._active_spark_context is None: @@ -493,14 +505,34 @@ class SparkContext: """ Control our logLevel. This overrides any user-defined log settings. Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN + + .. versionadded:: 1.4.0 + + Parameters + ---------- + logLevel : str + The desired log level as a string. + + Examples + -------- + >>> sc.setLogLevel("WARN") # doctest :+SKIP """ self._jsc.setLogLevel(logLevel) @classmethod def setSystemProperty(cls, key: str, value: str) -> None: """ - Set a Java system property, such as spark.executor.memory. This must - must be invoked before instantiating SparkContext. + Set a Java system property, such as `spark.executor.memory`. This must + be invoked before instantiating :class:`SparkContext`. + + .. versionadded:: 0.9.0 + + Parameters + ---------- + key : str + The key of a new Java system property. + value : str + The value of a new Java system property. """ SparkContext._ensure_initialized() assert SparkContext._jvm is not None @@ -510,6 +542,12 @@ class SparkContext: def version(self) -> str: """ The version of Spark on which this application is running. + + .. versionadded:: 1.1.0 + + Examples + -------- + >>> _ = sc.version """ return self._jsc.version() @@ -522,6 +560,8 @@ class SparkContext: * in case of local spark app something like 'local-1433865536131' * in case of YARN something like 'application_1433865536131_34483' + .. versionadded:: 1.5.0 + Examples -------- >>> sc.applicationId # doctest: +ELLIPSIS @@ -531,19 +571,40 @@ class SparkContext: @property def uiWebUrl(self) -> str: - """Return the URL of the SparkUI instance started by this SparkContext""" + """Return the URL of the SparkUI instance started by this :class:`SparkContext` + + .. versionadded:: 2.1.0 + + Examples + -------- + >>> sc.uiWebUrl + 'http://...' + """ return self._jsc.sc().uiWebUrl().get() @property def startTime(self) -> int: - """Return the epoch time when the Spark Context was started.""" + """Return the epoch time when the :class:`SparkContext` was started. + + .. versionadded:: 1.5.0 + + Examples + -------- + >>> _ = sc.startTime + """ return self._jsc.startTime() @property def defaultParallelism(self) -> int: """ - Default level of parallelism to use when not given by user (e.g. for - reduce tasks) + Default level of parallelism to use when not given by user (e.g. for reduce tasks) + + .. versionadded:: 0.7.0 + + Examples + -------- + >>> sc.defaultParallelism > 0 + True """ return self._jsc.sc().defaultParallelism() @@ -551,12 +612,21 @@ class SparkContext: def defaultMinPartitions(self) -> int: """ Default min number of partitions for Hadoop RDDs when not given by user + + .. versionadded:: 1.1.0 + + Examples + -------- + >>> sc.defaultMinPartitions > 0 + True """ return self._jsc.sc().defaultMinPartitions() def stop(self) -> None: """ - Shut down the SparkContext. + Shut down the :class:`SparkContext`. + + .. versionadded:: 0.7.0 """ if getattr(self, "_jsc", None): try: @@ -579,7 +649,21 @@ class SparkContext: def emptyRDD(self) -> RDD[Any]: """ - Create an RDD that has no partitions or elements. + Create an :class:`RDD` that has no partitions or elements. + + .. versionadded:: 1.5.0 + + Returns + ------- + :class:`RDD` + An empty RDD + + Examples + -------- + >>> sc.emptyRDD() + EmptyRDD... + >>> sc.emptyRDD().count() + 0 """ return RDD(self._jsc.emptyRDD(), self, NoOpSerializer()) @@ -592,22 +676,28 @@ class SparkContext: way as python's built-in range() function. If called with a single argument, the argument is interpreted as `end`, and `start` is set to 0. + .. versionadded:: 1.5.0 + Parameters ---------- start : int the start value end : int, optional the end value (exclusive) - step : int, optional - the incremental step (default: 1) + step : int, optional, default 1 + the incremental step numSlices : int, optional the number of partitions of the new RDD Returns ------- - :py:class:`pyspark.RDD` + :class:`RDD` An RDD of int + See Also + -------- + :meth:`pyspark.sql.SparkSession.range` + Examples -------- >>> sc.range(5).collect() @@ -616,6 +706,20 @@ class SparkContext: [2, 3] >>> sc.range(1, 7, 2).collect() [1, 3, 5] + + Generate RDD with a negative step + + >>> sc.range(5, 0, -1).collect() + [5, 4, 3, 2, 1] + >>> sc.range(0, 5, -1).collect() + [] + + Control the number of partitions + + >>> sc.range(5, numSlices=1).getNumPartitions() + 1 + >>> sc.range(5, numSlices=10).getNumPartitions() + 10 """ if end is None: end = start @@ -628,12 +732,32 @@ class SparkContext: Distribute a local Python collection to form an RDD. Using range is recommended if the input represents a range for performance. + .. versionadded:: 0.7.0 + + Parameters + ---------- + c : :class:`collections.abc.Iterable` + iterable collection to distribute + numSlices : int, optional + the number of partitions of the new RDD + + Returns + ------- + :class:`RDD` + RDD representing distributed collection. + Examples -------- >>> sc.parallelize([0, 2, 3, 4, 6], 5).glom().collect() [[0], [2], [3], [4], [6]] >>> sc.parallelize(range(0, 6, 2), 5).glom().collect() [[], [0], [], [2], [4]] + + Deal with a list of strings. + + >>> strings = ["a", "b", "c"] + >>> sc.parallelize(strings, 2).glom().collect() + [['a'], ['b', 'c']] """ numSlices = int(numSlices) if numSlices is not None else self.defaultParallelism if isinstance(c, range): @@ -694,7 +818,7 @@ class SparkContext: -------- data object to be serialized - serializer : :py:class:`pyspark.serializers.Serializer` + serializer : class:`pyspark.serializers.Serializer` reader_func : function A function which takes a filename and reads in the data in the jvm and returns a JavaRDD. Only used when encryption is disabled. @@ -731,13 +855,51 @@ class SparkContext: """ Load an RDD previously saved using :meth:`RDD.saveAsPickleFile` method. + .. versionadded:: 1.1.0 + + Parameters + ---------- + name : str + directory to the input data files, the path can be comma separated + paths as a list of inputs + minPartitions : int, optional + suggested minimum number of partitions for the resulting RDD + + Returns + ------- + :class:`RDD` + RDD representing unpickled data from the file(s). + + See Also + -------- + :meth:`RDD.saveAsPickleFile` + Examples -------- - >>> tmpFile = NamedTemporaryFile(delete=True) - >>> tmpFile.close() - >>> sc.parallelize(range(10)).saveAsPickleFile(tmpFile.name, 5) - >>> sorted(sc.pickleFile(tmpFile.name, 3).collect()) + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # Write a temporary pickled file + ... path1 = os.path.join(d, "pickled1") + ... sc.parallelize(range(10)).saveAsPickleFile(path1, 3) + ... + ... # Write another temporary pickled file + ... path2 = os.path.join(d, "pickled2") + ... sc.parallelize(range(-10, -5)).saveAsPickleFile(path2, 3) + ... + ... # Load picked file + ... collected1 = sorted(sc.pickleFile(path1, 3).collect()) + ... collected2 = sorted(sc.pickleFile(path2, 4).collect()) + ... + ... # Load two picked files together + ... collected3 = sorted(sc.pickleFile('{},{}'.format(path1, path2), 5).collect()) + + >>> collected1 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] + >>> collected2 + [-10, -9, -8, -7, -6] + >>> collected3 + [-10, -9, -8, -7, -6, 0, 1, 2, 3, 4, 5, 6, 7, 8, 9] """ minPartitions = minPartitions or self.defaultMinPartitions return RDD(self._jsc.objectFile(name, minPartitions), self) @@ -748,21 +910,60 @@ class SparkContext: """ Read a text file from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI, and return it as an - RDD of Strings. - The text files must be encoded as UTF-8. + RDD of Strings. The text files must be encoded as UTF-8. - If use_unicode is False, the strings will be kept as `str` (encoding - as `utf-8`), which is faster and smaller than unicode. (Added in - Spark 1.2) + .. versionadded:: 0.7.0 + + Parameters + ---------- + name : str + directory to the input data files, the path can be comma separated + paths as a list of inputs + minPartitions : int, optional + suggested minimum number of partitions for the resulting RDD + use_unicode : bool, default True + If `use_unicode` is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. + + .. versionadded:: 1.2.0 + + Returns + ------- + :class:`RDD` + RDD representing text data from the file(s). + + See Also + -------- + :meth:`RDD.saveAsTextFile` + :meth:`SparkContext.wholeTextFiles` Examples -------- - >>> path = os.path.join(tempdir, "sample-text.txt") - >>> with open(path, "w") as testFile: - ... _ = testFile.write("Hello world!") - >>> textFile = sc.textFile(path) - >>> textFile.collect() - ['Hello world!'] + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... path1 = os.path.join(d, "text1") + ... path2 = os.path.join(d, "text2") + ... + ... # Write a temporary text file + ... sc.parallelize(["x", "y", "z"]).saveAsTextFile(path1) + ... + ... # Write another temporary text file + ... sc.parallelize(["aa", "bb", "cc"]).saveAsTextFile(path2) + ... + ... # Load text file + ... collected1 = sorted(sc.textFile(path1, 3).collect()) + ... collected2 = sorted(sc.textFile(path2, 4).collect()) + ... + ... # Load two text files together + ... collected3 = sorted(sc.textFile('{},{}'.format(path1, path2), 5).collect()) + + >>> collected1 + ['x', 'y', 'z'] + >>> collected2 + ['aa', 'bb', 'cc'] + >>> collected3 + ['aa', 'bb', 'cc', 'x', 'y', 'z'] """ minPartitions = minPartitions or min(self.defaultParallelism, 2) return RDD(self._jsc.textFile(name, minPartitions), self, UTF8Deserializer(use_unicode)) @@ -778,9 +979,7 @@ class SparkContext: value is the content of each file. The text files must be encoded as UTF-8. - If `use_unicode` is False, the strings will be kept as `str` (encoding - as `utf-8`), which is faster and smaller than unicode. (Added in - Spark 1.2) + .. versionadded:: 1.0.0 For example, if you have the following files: @@ -801,21 +1000,49 @@ class SparkContext: ... (a-hdfs-path/part-nnnnn, its content) + Parameters + ---------- + path : str + directory to the input data files, the path can be comma separated + paths as a list of inputs + minPartitions : int, optional + suggested minimum number of partitions for the resulting RDD + use_unicode : bool, default True + If `use_unicode` is False, the strings will be kept as `str` (encoding + as `utf-8`), which is faster and smaller than unicode. + + .. versionadded:: 1.2.0 + + Returns + ------- + :class:`RDD` + RDD representing path-content pairs from the file(s). + Notes ----- Small files are preferred, as each file will be loaded fully in memory. + See Also + -------- + :meth:`RDD.saveAsTextFile` + :meth:`SparkContext.textFile` + Examples -------- - >>> dirPath = os.path.join(tempdir, "files") - >>> os.mkdir(dirPath) - >>> with open(os.path.join(dirPath, "1.txt"), "w") as file1: - ... _ = file1.write("1") - >>> with open(os.path.join(dirPath, "2.txt"), "w") as file2: - ... _ = file2.write("2") - >>> textFiles = sc.wholeTextFiles(dirPath) - >>> sorted(textFiles.collect()) - [('.../1.txt', '1'), ('.../2.txt', '2')] + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # Write a temporary text file + ... with open(os.path.join(d, "1.txt"), "w") as f: + ... _ = f.write("123") + ... + ... # Write another temporary text file + ... with open(os.path.join(d, "2.txt"), "w") as f: + ... _ = f.write("xyz") + ... + ... collected = sorted(sc.wholeTextFiles(d).collect()) + >>> collected + [('.../1.txt', '123'), ('.../2.txt', 'xyz')] """ minPartitions = minPartitions or self.defaultMinPartitions return RDD( @@ -832,9 +1059,46 @@ class SparkContext: in a key-value pair, where the key is the path of each file, the value is the content of each file. + .. versionadded:: 1.3.0 + + Parameters + ---------- + path : str + directory to the input data files, the path can be comma separated + paths as a list of inputs + minPartitions : int, optional + suggested minimum number of partitions for the resulting RDD + + Returns + ------- + :class:`RDD` + RDD representing path-content pairs from the file(s). + Notes ----- Small files are preferred, large file is also allowable, but may cause bad performance. + + See Also + -------- + :meth:`SparkContext.binaryRecords` + + Examples + -------- + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # Write a temporary binary file + ... with open(os.path.join(d, "1.bin"), "wb") as f1: + ... _ = f1.write(b"binary data I") + ... + ... # Write another temporary binary file + ... with open(os.path.join(d, "2.bin"), "wb") as f2: + ... _ = f2.write(b"binary data II") + ... + ... collected = sorted(sc.binaryFiles(d).collect()) + + >>> collected + [('.../1.bin', b'binary data I'), ('.../2.bin', b'binary data II')] """ minPartitions = minPartitions or self.defaultMinPartitions return RDD( @@ -849,12 +1113,43 @@ class SparkContext: with the specified numerical format (see ByteBuffer), and the number of bytes per record is constant. + .. versionadded:: 1.3.0 + Parameters ---------- path : str Directory to the input data files recordLength : int The length at which to split the records + + Returns + ------- + :class:`RDD` + RDD of data with values, represented as byte arrays + + See Also + -------- + :meth:`SparkContext.binaryFiles` + + Examples + -------- + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # Write a temporary file + ... with open(os.path.join(d, "1.bin"), "w") as f: + ... for i in range(3): + ... _ = f.write("%04d" % i) + ... + ... # Write another file + ... with open(os.path.join(d, "2.bin"), "w") as f: + ... for i in [-1, -2, -10]: + ... _ = f.write("%04d" % i) + ... + ... collected = sorted(sc.binaryRecords(d, 4).collect()) + + >>> collected + [b'-001', b'-002', b'-010', b'0000', b'0001', b'0002'] """ return RDD(self._jsc.binaryRecords(path, recordLength), self, NoOpSerializer()) @@ -888,6 +1183,8 @@ class SparkContext: 3. If this fails, the fallback is to call 'toString' on each key and value 4. :class:`CPickleSerializer` is used to deserialize pickled objects on the Python side + .. versionadded:: 1.3.0 + Parameters ---------- path : str @@ -903,9 +1200,43 @@ class SparkContext: fully qualifiedname of a function returning value WritableConverter minSplits : int, optional minimum splits in dataset (default min(2, sc.defaultParallelism)) - batchSize : int, optional + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) + + Returns + ------- + :class:`RDD` + RDD of tuples of key and corresponding value + + See Also + -------- + :meth:`RDD.saveAsSequenceFile` + :meth:`RDD.saveAsNewAPIHadoopFile` + :meth:`RDD.saveAsHadoopFile` + :meth:`SparkContext.newAPIHadoopFile` + :meth:`SparkContext.hadoopFile` + + Examples + -------- + >>> import os + >>> import tempfile + + Set the class of output format + + >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "hadoop_file") + ... + ... # Write a temporary Hadoop file + ... rdd = sc.parallelize([(1, {3.0: "bb"}), (2, {1.0: "aa"}), (3, {2.0: "dd"})]) + ... rdd.saveAsNewAPIHadoopFile(path, output_format_class) + ... + ... collected = sorted(sc.sequenceFile(path).collect()) + + >>> collected + [(1, {3.0: 'bb'}), (2, {1.0: 'aa'}), (3, {2.0: 'dd'})] """ minSplits = minSplits or min(self.defaultParallelism, 2) assert self._jvm is not None @@ -935,11 +1266,13 @@ class SparkContext: """ Read a 'new API' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`. + The mechanism is the same as for meth:`SparkContext.sequenceFile`. A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java + .. versionadded:: 1.1.0 + Parameters ---------- path : str @@ -962,9 +1295,47 @@ class SparkContext: conf : dict, optional Hadoop configuration, passed in as a dict None by default - batchSize : int, optional + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) + + Returns + ------- + :class:`RDD` + RDD of tuples of key and corresponding value + + See Also + -------- + :meth:`RDD.saveAsSequenceFile` + :meth:`RDD.saveAsNewAPIHadoopFile` + :meth:`RDD.saveAsHadoopFile` + :meth:`SparkContext.sequenceFile` + :meth:`SparkContext.hadoopFile` + + Examples + -------- + >>> import os + >>> import tempfile + + Set the related classes + + >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" + >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" + >>> key_class = "org.apache.hadoop.io.IntWritable" + >>> value_class = "org.apache.hadoop.io.Text" + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "new_hadoop_file") + ... + ... # Write a temporary Hadoop file + ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) + ... rdd.saveAsNewAPIHadoopFile(path, output_format_class, key_class, value_class) + ... + ... loaded = sc.newAPIHadoopFile(path, input_format_class, key_class, value_class) + ... collected = sorted(loaded.collect()) + + >>> collected + [(1, ''), (1, 'a'), (3, 'x')] """ jconf = self._dictToJavaMap(conf) assert self._jvm is not None @@ -995,7 +1366,9 @@ class SparkContext: Read a 'new API' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`. + The mechanism is the same as for meth:`SparkContext.sequenceFile`. + + .. versionadded:: 1.1.0 Parameters ---------- @@ -1015,9 +1388,58 @@ class SparkContext: (None by default) conf : dict, optional Hadoop configuration, passed in as a dict (None by default) - batchSize : int, optional + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) + + Returns + ------- + :class:`RDD` + RDD of tuples of key and corresponding value + + See Also + -------- + :meth:`RDD.saveAsNewAPIHadoopDataset` + :meth:`RDD.saveAsHadoopDataset` + :meth:`SparkContext.hadoopRDD` + :meth:`SparkContext.hadoopFile` + + Examples + -------- + >>> import os + >>> import tempfile + + Set the related classes + + >>> output_format_class = "org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat" + >>> input_format_class = "org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat" + >>> key_class = "org.apache.hadoop.io.IntWritable" + >>> value_class = "org.apache.hadoop.io.Text" + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "new_hadoop_file") + ... + ... # Create the conf for writing + ... write_conf = { + ... "mapreduce.job.outputformat.class": (output_format_class), + ... "mapreduce.job.output.key.class": key_class, + ... "mapreduce.job.output.value.class": value_class, + ... "mapreduce.output.fileoutputformat.outputdir": path, + ... } + ... + ... # Write a temporary Hadoop file + ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) + ... rdd.saveAsNewAPIHadoopDataset(conf=write_conf) + ... + ... # Create the conf for reading + ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} + ... + ... loaded = sc.newAPIHadoopRDD(input_format_class, + ... key_class, value_class, conf=read_conf) + ... collected = sorted(loaded.collect()) + + >>> collected + [(1, ''), (1, 'a'), (3, 'x')] """ jconf = self._dictToJavaMap(conf) assert self._jvm is not None @@ -1047,11 +1469,15 @@ class SparkContext: """ Read an 'old' Hadoop InputFormat with arbitrary key and value class from HDFS, a local file system (available on all nodes), or any Hadoop-supported file system URI. - The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`. + The mechanism is the same as for meth:`SparkContext.sequenceFile`. + + .. versionadded:: 1.1.0 A Hadoop configuration can be passed in as a Python dict. This will be converted into a Configuration in Java. + Parameters + ---------- path : str path to Hadoop file inputFormatClass : str @@ -1064,15 +1490,51 @@ class SparkContext: (e.g. "org.apache.hadoop.io.LongWritable") keyConverter : str, optional fully qualified name of a function returning key WritableConverter - (None by default) valueConverter : str, optional fully qualified name of a function returning value WritableConverter - (None by default) conf : dict, optional - Hadoop configuration, passed in as a dict (None by default) - batchSize : int, optional + Hadoop configuration, passed in as a dict + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) + + Returns + ------- + :class:`RDD` + RDD of tuples of key and corresponding value + + See Also + -------- + :meth:`RDD.saveAsSequenceFile` + :meth:`RDD.saveAsNewAPIHadoopFile` + :meth:`RDD.saveAsHadoopFile` + :meth:`SparkContext.newAPIHadoopFile` + :meth:`SparkContext.hadoopRDD` + + Examples + -------- + >>> import os + >>> import tempfile + + Set the related classes + + >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat" + >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat" + >>> key_class = "org.apache.hadoop.io.IntWritable" + >>> value_class = "org.apache.hadoop.io.Text" + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "old_hadoop_file") + ... + ... # Write a temporary Hadoop file + ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) + ... rdd.saveAsHadoopFile(path, output_format_class, key_class, value_class) + ... + ... loaded = sc.hadoopFile(path, input_format_class, key_class, value_class) + ... collected = sorted(loaded.collect()) + + >>> collected + [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')] """ jconf = self._dictToJavaMap(conf) assert self._jvm is not None @@ -1103,7 +1565,9 @@ class SparkContext: Read an 'old' Hadoop InputFormat with arbitrary key and value class, from an arbitrary Hadoop configuration, which is passed in as a Python dict. This will be converted into a Configuration in Java. - The mechanism is the same as for :py:meth:`SparkContext.sequenceFile`. + The mechanism is the same as for meth:`SparkContext.sequenceFile`. + + .. versionadded:: 1.1.0 Parameters ---------- @@ -1117,15 +1581,61 @@ class SparkContext: (e.g. "org.apache.hadoop.io.LongWritable") keyConverter : str, optional fully qualified name of a function returning key WritableConverter - (None by default) valueConverter : str, optional fully qualified name of a function returning value WritableConverter - (None by default) conf : dict, optional - Hadoop configuration, passed in as a dict (None by default) - batchSize : int, optional + Hadoop configuration, passed in as a dict + batchSize : int, optional, default 0 The number of Python objects represented as a single Java object. (default 0, choose batchSize automatically) + + Returns + ------- + :class:`RDD` + RDD of tuples of key and corresponding value + + See Also + -------- + :meth:`RDD.saveAsNewAPIHadoopDataset` + :meth:`RDD.saveAsHadoopDataset` + :meth:`SparkContext.newAPIHadoopRDD` + :meth:`SparkContext.hadoopFile` + + Examples + -------- + >>> import os + >>> import tempfile + + Set the related classes + + >>> output_format_class = "org.apache.hadoop.mapred.TextOutputFormat" + >>> input_format_class = "org.apache.hadoop.mapred.TextInputFormat" + >>> key_class = "org.apache.hadoop.io.IntWritable" + >>> value_class = "org.apache.hadoop.io.Text" + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "old_hadoop_file") + ... + ... # Create the conf for writing + ... write_conf = { + ... "mapred.output.format.class": output_format_class, + ... "mapreduce.job.output.key.class": key_class, + ... "mapreduce.job.output.value.class": value_class, + ... "mapreduce.output.fileoutputformat.outputdir": path, + ... } + ... + ... # Write a temporary Hadoop file + ... rdd = sc.parallelize([(1, ""), (1, "a"), (3, "x")]) + ... rdd.saveAsHadoopDataset(conf=write_conf) + ... + ... # Create the conf for reading + ... read_conf = {"mapreduce.input.fileinputformat.inputdir": path} + ... + ... loaded = sc.hadoopRDD(input_format_class, key_class, value_class, conf=read_conf) + ... collected = sorted(loaded.collect()) + + >>> collected + [(0, '1\\t'), (0, '1\\ta'), (0, '3\\tx')] """ jconf = self._dictToJavaMap(conf) assert self._jvm is not None @@ -1153,16 +1663,28 @@ class SparkContext: although this forces them to be reserialized using the default serializer: + .. versionadded:: 0.7.0 + + See Also + -------- + :meth:`RDD.union` + Examples -------- - >>> path = os.path.join(tempdir, "union-text.txt") - >>> with open(path, "w") as testFile: - ... _ = testFile.write("Hello") - >>> textFile = sc.textFile(path) - >>> textFile.collect() - ['Hello'] - >>> parallelized = sc.parallelize(["World!"]) - >>> sorted(sc.union([textFile, parallelized]).collect()) + >>> import os + >>> import tempfile + >>> with tempfile.TemporaryDirectory() as d: + ... # generate a text RDD + ... with open(os.path.join(d, "union-text.txt"), "w") as f: + ... _ = f.write("Hello") + ... text_rdd = sc.textFile(d) + ... + ... # generate another RDD + ... parallelized = sc.parallelize(["World!"]) + ... + ... unioned = sorted(sc.union([text_rdd, parallelized]).collect()) + + >>> unioned ['Hello', 'World!'] """ first_jrdd_deserializer = rdds[0]._jrdd_deserializer @@ -1194,6 +1716,30 @@ class SparkContext: Broadcast a read-only variable to the cluster, returning a :class:`Broadcast` object for reading it in distributed functions. The variable will be sent to each cluster only once. + + .. versionadded:: 0.7.0 + + Parameters + ---------- + value : T + value to broadcast to the Spark nodes + + Returns + ------- + :class:`Broadcast` + :class:`Broadcast` object, a read-only variable cached on each machine + + Examples + -------- + >>> mapping = {1: 10001, 2: 10002} + >>> bc = sc.broadcast(mapping) + + >>> rdd = sc.range(5) + >>> rdd2 = rdd.map(lambda i: bc.value[i] if i in bc.value else -1) + >>> rdd2.collect() + [-1, 10001, 10002, -1, -1] + + >>> bc.destroy() """ return Broadcast(self, value, self._pickled_broadcast_vars) @@ -1206,6 +1752,39 @@ class SparkContext: data type if provided. Default AccumulatorParams are used for integers and floating-point numbers if you do not provide one. For other types, a custom AccumulatorParam can be used. + + .. versionadded:: 0.7.0 + + Parameters + ---------- + value : T + initialized value + accum_param : :class:`pyspark.AccumulatorParam`, optional + helper object to define how to add values + + Returns + ------- + :class:`Accumulator` + `Accumulator` object, a shared variable that can be accumulated + + Examples + -------- + >>> acc = sc.accumulator(9) + >>> acc.value + 9 + >>> acc += 1 + >>> acc.value + 10 + + Accumulator object can be accumulated in RDD operations: + + >>> rdd = sc.range(5) + >>> def f(x): + ... global acc + ... acc += 1 + >>> rdd.foreach(f) + >>> acc.value + 15 """ if accum_param is None: if isinstance(value, int): @@ -1232,31 +1811,67 @@ class SparkContext: A directory can be given if the recursive option is set to True. Currently directories are only supported for Hadoop-supported filesystems. + .. versionadded:: 0.7.0 + + Parameters + ---------- + path : str + can be either a local file, a file in HDFS (or other Hadoop-supported + filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + use :meth:`SparkFiles.get` to find its download location. + recursive : bool, default False + whether to recursively add files in the input directory + + See Also + -------- + :meth:`SparkContext.listFiles` + :meth:`SparkContext.addPyFile` + :meth:`SparkFiles.get` + Notes ----- A path can be added only once. Subsequent additions of the same path are ignored. Examples -------- + >>> import os + >>> import tempfile >>> from pyspark import SparkFiles - >>> path = os.path.join(tempdir, "test.txt") - >>> with open(path, "w") as testFile: - ... _ = testFile.write("100") - >>> sc.addFile(path) - >>> def func(iterator): - ... with open(SparkFiles.get("test.txt")) as testFile: - ... fileVal = int(testFile.readline()) - ... return [x * fileVal for x in iterator] - >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + + >>> with tempfile.TemporaryDirectory() as d: + ... path1 = os.path.join(d, "test1.txt") + ... with open(path1, "w") as f: + ... _ = f.write("100") + ... + ... path2 = os.path.join(d, "test2.txt") + ... with open(path2, "w") as f: + ... _ = f.write("200") + ... + ... sc.addFile(path1) + ... file_list1 = sorted(sc.listFiles) + ... + ... sc.addFile(path2) + ... file_list2 = sorted(sc.listFiles) + ... + ... # add path2 twice, this addition will be ignored + ... sc.addFile(path2) + ... file_list3 = sorted(sc.listFiles) + ... + ... def func(iterator): + ... with open(SparkFiles.get("test1.txt")) as f: + ... mul = int(f.readline()) + ... return [x * mul for x in iterator] + ... + ... collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + + >>> file_list1 + ['file:/.../test1.txt'] + >>> file_list2 + ['file:/.../test1.txt', 'file:/.../test2.txt'] + >>> file_list3 + ['file:/.../test1.txt', 'file:/.../test2.txt'] + >>> collected [100, 200, 300, 400] - >>> sc.listFiles - ['file:/.../test.txt'] - >>> path2 = os.path.join(tempdir, "test2.txt") - >>> with open(path2, "w") as testFile: - ... _ = testFile.write("100") - >>> sc.addFile(path2) - >>> sorted(sc.listFiles) - ['file:/.../test.txt', 'file:/.../test2.txt'] """ self._jsc.sc().addFile(path, recursive) @@ -1268,7 +1883,7 @@ class SparkContext: See Also -------- - SparkContext.addFile + :meth:`SparkContext.addFile` """ return list( self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr] @@ -1283,6 +1898,17 @@ class SparkContext: file, a file in HDFS (or other Hadoop-supported filesystems), or an HTTP, HTTPS or FTP URI. + .. versionadded:: 0.7.0 + + Parameters + ---------- + path : str + can be either a .py file or .zip dependency. + + See Also + -------- + :meth:`SparkContext.addFile` + Notes ----- A path can be added only once. Subsequent additions of the same path are ignored. @@ -1310,6 +1936,18 @@ class SparkContext: .. versionadded:: 3.3.0 + Parameters + ---------- + path : str + can be either a local file, a file in HDFS (or other Hadoop-supported + filesystems), or an HTTP, HTTPS or FTP URI. To access the file in Spark jobs, + use :meth:`SparkFiles.get` to find its download location. + + See Also + -------- + :meth:`SparkContext.listArchives` + :meth:`SparkFiles.get` + Notes ----- A path can be added only once. Subsequent additions of the same path are ignored. @@ -1319,34 +1957,48 @@ class SparkContext: -------- Creates a zipped file that contains a text file written '100'. + >>> import os + >>> import tempfile >>> import zipfile >>> from pyspark import SparkFiles - >>> path = os.path.join(tempdir, "test.txt") - >>> zip_path = os.path.join(tempdir, "test.zip") - >>> with zipfile.ZipFile(zip_path, "w", zipfile.ZIP_DEFLATED) as zipped: - ... with open(path, "w") as f: - ... _ = f.write("100") - ... zipped.write(path, os.path.basename(path)) - >>> sc.addArchive(zip_path) - >>> sc.listArchives - ['file:/.../test.zip'] - >>> zip_path2 = os.path.join(tempdir, "test2.zip") - >>> with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as zipped: + + >>> with tempfile.TemporaryDirectory() as d: + ... path = os.path.join(d, "test.txt") ... with open(path, "w") as f: ... _ = f.write("100") - ... zipped.write(path, os.path.basename(path)) - >>> sc.addArchive(zip_path2) - >>> sorted(sc.listArchives) - ['file:/.../test.zip', 'file:/.../test2.zip'] - - Reads the '100' as an integer in the zipped file, and processes - it with the data in the RDD. - - >>> def func(iterator): - ... with open("%s/test.txt" % SparkFiles.get("test.zip")) as f: - ... v = int(f.readline()) - ... return [x * int(v) for x in iterator] - >>> sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + ... + ... zip_path1 = os.path.join(d, "test1.zip") + ... with zipfile.ZipFile(zip_path1, "w", zipfile.ZIP_DEFLATED) as z: + ... z.write(path, os.path.basename(path)) + ... + ... zip_path2 = os.path.join(d, "test2.zip") + ... with zipfile.ZipFile(zip_path2, "w", zipfile.ZIP_DEFLATED) as z: + ... z.write(path, os.path.basename(path)) + ... + ... sc.addArchive(zip_path1) + ... arch_list1 = sorted(sc.listArchives) + ... + ... sc.addArchive(zip_path2) + ... arch_list2 = sorted(sc.listArchives) + ... + ... # add zip_path2 twice, this addition will be ignored + ... sc.addArchive(zip_path2) + ... arch_list3 = sorted(sc.listArchives) + ... + ... def func(iterator): + ... with open("%s/test.txt" % SparkFiles.get("test1.zip")) as f: + ... mul = int(f.readline()) + ... return [x * mul for x in iterator] + ... + ... collected = sc.parallelize([1, 2, 3, 4]).mapPartitions(func).collect() + + >>> arch_list1 + ['file:/.../test1.zip'] + >>> arch_list2 + ['file:/.../test1.zip', 'file:/.../test2.zip'] + >>> arch_list3 + ['file:/.../test1.zip', 'file:/.../test2.zip'] + >>> collected [100, 200, 300, 400] """ self._jsc.sc().addArchive(path) @@ -1359,7 +2011,7 @@ class SparkContext: See Also -------- - SparkContext.addArchive + :meth:`SparkContext.addArchive` """ return list( self._jvm.scala.collection.JavaConverters.seqAsJavaList( # type: ignore[union-attr] @@ -1371,14 +2023,31 @@ class SparkContext: """ Set the directory under which RDDs are going to be checkpointed. The directory must be an HDFS path if running on a cluster. + + .. versionadded:: 0.7.0 + + Parameters + ---------- + dirName : str + path to the directory where checkpoint files will be stored + (must be HDFS path if running in cluster) + + See Also + -------- + :meth:`SparkContext.getCheckpointDir` """ self._jsc.sc().setCheckpointDir(dirName) - @since(3.1) def getCheckpointDir(self) -> Optional[str]: """ Return the directory where RDDs are checkpointed. Returns None if no checkpoint directory has been set. + + .. versionadded:: 3.1.0 + + See Also + -------- + :meth:`SparkContext.setCheckpointDir` """ if not self._jsc.sc().getCheckpointDir().isEmpty(): return self._jsc.sc().getCheckpointDir().get() @@ -1412,6 +2081,17 @@ class SparkContext: The application can use :meth:`SparkContext.cancelJobGroup` to cancel all running jobs in this group. + .. versionadded:: 1.0.0 + + Parameters + ---------- + groupId : str + The group ID to assign. + description : str + The description to set for the job group. + interruptOnCancel : bool, optional, default False + whether to interrupt jobs on job cancellation. + Notes ----- If interruptOnCancel is set to true for the job group, then job cancellation will result @@ -1422,6 +2102,10 @@ class SparkContext: If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread local inheritance. + See Also + -------- + :meth:`SparkContext.cancelJobGroup` + Examples -------- >>> import threading @@ -1457,6 +2141,19 @@ class SparkContext: Set a local property that affects jobs submitted from this thread, such as the Spark fair scheduler pool. + .. versionadded:: 1.0.0 + + Parameters + ---------- + key : str + The key of the local property to set. + value : str + The value of the local property to set. + + See Also + -------- + :meth:`SparkContext.getLocalProperty` + Notes ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread @@ -1468,6 +2165,12 @@ class SparkContext: """ Get a local property set in this thread, or null if it is missing. See :meth:`setLocalProperty`. + + .. versionadded:: 1.0.0 + + See Also + -------- + :meth:`SparkContext.setLocalProperty` """ return self._jsc.getLocalProperty(key) @@ -1475,6 +2178,13 @@ class SparkContext: """ Set a human readable description of the current job. + .. versionadded:: 2.3.0 + + Parameters + ---------- + value : str + The job description to set. + Notes ----- If you run jobs in parallel, use :class:`pyspark.InheritableThread` for thread @@ -1485,6 +2195,8 @@ class SparkContext: def sparkUser(self) -> str: """ Get SPARK_USER for user who is running SparkContext. + + .. versionadded:: 1.0.0 """ return self._jsc.sc().sparkUser() @@ -1492,18 +2204,39 @@ class SparkContext: """ Cancel active jobs for the specified group. See :meth:`SparkContext.setJobGroup`. for more information. + + .. versionadded:: 1.1.0 + + Parameters + ---------- + groupId : str + The group ID to cancel the job. + + See Also + -------- + :meth:`SparkContext.setJobGroup` + :meth:`SparkContext.cancelJobGroup` """ self._jsc.sc().cancelJobGroup(groupId) def cancelAllJobs(self) -> None: """ Cancel all jobs that have been scheduled or are running. + + .. versionadded:: 1.1.0 + + See Also + -------- + :meth:`SparkContext.cancelJobGroup` + :meth:`SparkContext.runJob` """ self._jsc.sc().cancelAllJobs() def statusTracker(self) -> StatusTracker: """ Return :class:`StatusTracker` object + + .. versionadded:: 1.4.0 """ return StatusTracker(self._jsc.statusTracker()) @@ -1520,6 +2253,29 @@ class SparkContext: If 'partitions' is not specified, this will run over all partitions. + .. versionadded:: 1.1.0 + + Parameters + ---------- + rdd : :class:`RDD` + target RDD to run tasks on + partitionFunc : function + a function to run on each partition of the RDD + partitions : list, optional + set of partitions to run on; some jobs may not want to compute on all + partitions of the target RDD, e.g. for operations like `first` + allowLocal : bool, default False + this parameter takes no effect + + Returns + ------- + list + results of specified partitions + + See Also + -------- + :meth:`SparkContext.cancelAllJobs` + Examples -------- >>> myRDD = sc.parallelize(range(6), 3) @@ -1542,7 +2298,14 @@ class SparkContext: return list(_load_from_socket(sock_info, mappedRDD._jrdd_deserializer)) def show_profiles(self) -> None: - """Print the profile stats to stdout""" + """Print the profile stats to stdout + + .. versionadded:: 1.2.0 + + See Also + -------- + :meth:`SparkContext.dump_profiles` + """ if self.profiler_collector is not None: self.profiler_collector.show_profiles() else: @@ -1552,7 +2315,14 @@ class SparkContext: ) def dump_profiles(self, path: str) -> None: - """Dump the profile stats into directory `path`""" + """Dump the profile stats into directory `path` + + .. versionadded:: 1.2.0 + + See Also + -------- + :meth:`SparkContext.show_profiles` + """ if self.profiler_collector is not None: self.profiler_collector.dump_profiles(path) else: @@ -1562,12 +2332,22 @@ class SparkContext: ) def getConf(self) -> SparkConf: + """Return a copy of this SparkContext's configuration :class:`SparkConf`. + + .. versionadded:: 2.1.0 + """ conf = SparkConf() conf.setAll(self._conf.getAll()) return conf @property def resources(self) -> Dict[str, ResourceInformation]: + """ + Return the resource information of this :class:`SparkContext`. + A resource could be a GPU, FPGA, etc. + + .. versionadded:: 3.0.0 + """ resources = {} jresources = self._jsc.resources() for x in jresources: @@ -1589,14 +2369,12 @@ class SparkContext: def _test() -> None: - import atexit import doctest - import tempfile + from pyspark import SparkConf globs = globals().copy() - globs["sc"] = SparkContext("local[4]", "PythonTest") - globs["tempdir"] = tempfile.mkdtemp() - atexit.register(lambda: shutil.rmtree(globs["tempdir"])) + conf = SparkConf().set("spark.ui.enabled", "True") + globs["sc"] = SparkContext("local[4]", "context tests", conf=conf) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) globs["sc"].stop() if failure_count: --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org