Repository: spark Updated Branches: refs/heads/master 3864480e1 -> 20b7c684c
[SPARK-25248][.1][PYSPARK] update barrier Python API ## What changes were proposed in this pull request? I made one pass over the Python APIs for barrier mode and updated them to match the Scala doc in #22240 . Major changes: * export the public classes * expand the docs * add doc for BarrierTaskInfo.addresss cc: jiangxb1987 Closes #22261 from mengxr/SPARK-25248.1. Authored-by: Xiangrui Meng <m...@databricks.com> Signed-off-by: Xiangrui Meng <m...@databricks.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20b7c684 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20b7c684 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20b7c684 Branch: refs/heads/master Commit: 20b7c684cc4a8136b9a9c56390a4948de04e7c34 Parents: 3864480 Author: Xiangrui Meng <m...@databricks.com> Authored: Wed Aug 29 07:22:03 2018 -0700 Committer: Xiangrui Meng <m...@databricks.com> Committed: Wed Aug 29 07:22:03 2018 -0700 ---------------------------------------------------------------------- python/pyspark/__init__.py | 12 +++++++++--- python/pyspark/rdd.py | 22 ++++++++++++++++++---- python/pyspark/taskcontext.py | 26 +++++++++++++++++--------- 3 files changed, 44 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/__init__.py ---------------------------------------------------------------------- diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py index 5821891..ee153af 100644 --- a/python/pyspark/__init__.py +++ b/python/pyspark/__init__.py @@ -36,7 +36,12 @@ Public classes: Finer-grained cache persistence levels. - :class:`TaskContext`: Information about the current running task, available on the workers and experimental. - + - :class:`RDDBarrier`: + Wraps an RDD under a barrier stage for barrier execution. + - :class:`BarrierTaskContext`: + A :class:`TaskContext` that provides extra info and tooling for barrier execution. + - :class:`BarrierTaskInfo`: + Information about a barrier task. """ from functools import wraps @@ -44,14 +49,14 @@ import types from pyspark.conf import SparkConf from pyspark.context import SparkContext -from pyspark.rdd import RDD +from pyspark.rdd import RDD, RDDBarrier from pyspark.files import SparkFiles from pyspark.storagelevel import StorageLevel from pyspark.accumulators import Accumulator, AccumulatorParam from pyspark.broadcast import Broadcast from pyspark.serializers import MarshalSerializer, PickleSerializer from pyspark.status import * -from pyspark.taskcontext import TaskContext +from pyspark.taskcontext import TaskContext, BarrierTaskContext, BarrierTaskInfo from pyspark.profiler import Profiler, BasicProfiler from pyspark.version import __version__ from pyspark._globals import _NoValue @@ -113,4 +118,5 @@ __all__ = [ "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", "Broadcast", "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer", "StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", "BasicProfiler", "TaskContext", + "RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo", ] http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/rdd.py ---------------------------------------------------------------------- diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py index 380475e..b317156 100644 --- a/python/pyspark/rdd.py +++ b/python/pyspark/rdd.py @@ -2390,7 +2390,18 @@ class RDD(object): """ .. note:: Experimental - Indicates that Spark must launch the tasks together for the current stage. + Marks the current stage as a barrier stage, where Spark must launch all tasks together. + In case of a task failure, instead of only restarting the failed task, Spark will abort the + entire stage and relaunch all tasks for this stage. + The barrier execution mode feature is experimental and it only handles limited scenarios. + Please read the linked SPIP and design docs to understand the limitations and future plans. + + :return: an :class:`RDDBarrier` instance that provides actions within a barrier stage. + + .. seealso:: :class:`BarrierTaskContext` + .. seealso:: `SPIP: Barrier Execution Mode \ + <http://jira.apache.org/jira/browse/SPARK-24374>`_ + .. seealso:: `Design Doc <https://jira.apache.org/jira/browse/SPARK-24582>`_ .. versionadded:: 2.4.0 """ @@ -2430,8 +2441,8 @@ class RDDBarrier(object): """ .. note:: Experimental - An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to launch tasks of the stage - contains this RDD together. + Wraps an RDD in a barrier stage, which forces Spark to launch tasks of this stage together. + :class:`RDDBarrier` instances are created by :func:`RDD.barrier`. .. versionadded:: 2.4.0 """ @@ -2443,7 +2454,10 @@ class RDDBarrier(object): """ .. note:: Experimental - Return a new RDD by applying a function to each partition of this RDD. + Returns a new RDD by applying a function to each partition of the wrapped RDD, + where tasks are launched together in a barrier stage. + The interface is the same as :func:`RDD.mapPartitions`. + Please see the API doc there. .. versionadded:: 2.4.0 """ http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/taskcontext.py ---------------------------------------------------------------------- diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py index 53fc2b2..b61643e 100644 --- a/python/pyspark/taskcontext.py +++ b/python/pyspark/taskcontext.py @@ -131,9 +131,8 @@ class BarrierTaskContext(TaskContext): """ .. note:: Experimental - A TaskContext with extra info and tooling for a barrier stage. To access the BarrierTaskContext - for a running task, use: - L{BarrierTaskContext.get()}. + A :class:`TaskContext` with extra contextual info and tooling for tasks in a barrier stage. + Use :func:`BarrierTaskContext.get` to obtain the barrier context for a running barrier task. .. versionadded:: 2.4.0 """ @@ -155,8 +154,11 @@ class BarrierTaskContext(TaskContext): @classmethod def get(cls): """ - Return the currently active BarrierTaskContext. This can be called inside of user functions - to access contextual information about running tasks. + .. note:: Experimental + + Return the currently active :class:`BarrierTaskContext`. + This can be called inside of user functions to access contextual information about + running tasks. .. note:: Must be called on the worker, not the driver. Returns None if not initialized. """ @@ -176,7 +178,12 @@ class BarrierTaskContext(TaskContext): .. note:: Experimental Sets a global barrier and waits until all tasks in this stage hit this barrier. - Note this method is only allowed for a BarrierTaskContext. + Similar to `MPI_Barrier` function in MPI, this function blocks until all tasks + in the same stage have reached this routine. + + .. warning:: In a barrier stage, each task much have the same number of `barrier()` + calls, in all possible code branches. + Otherwise, you may get the job hanging or a SparkException after timeout. .. versionadded:: 2.4.0 """ @@ -190,9 +197,8 @@ class BarrierTaskContext(TaskContext): """ .. note:: Experimental - Returns the all task infos in this barrier stage, the task infos are ordered by - partitionId. - Note this method is only allowed for a BarrierTaskContext. + Returns :class:`BarrierTaskInfo` for all tasks in this barrier stage, + ordered by partition ID. .. versionadded:: 2.4.0 """ @@ -210,6 +216,8 @@ class BarrierTaskInfo(object): Carries all task infos of a barrier task. + :var address: The IPv4 address (host:port) of the executor that the barrier task is running on + .. versionadded:: 2.4.0 """ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org