Repository: spark
Updated Branches:
  refs/heads/master 3864480e1 -> 20b7c684c


[SPARK-25248][.1][PYSPARK] update barrier Python API

## What changes were proposed in this pull request?

I made one pass over the Python APIs for barrier mode and updated them to match 
the Scala doc in #22240 . Major changes:

* export the public classes
* expand the docs
* add doc for BarrierTaskInfo.addresss

cc: jiangxb1987

Closes #22261 from mengxr/SPARK-25248.1.

Authored-by: Xiangrui Meng <m...@databricks.com>
Signed-off-by: Xiangrui Meng <m...@databricks.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/20b7c684
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/20b7c684
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/20b7c684

Branch: refs/heads/master
Commit: 20b7c684cc4a8136b9a9c56390a4948de04e7c34
Parents: 3864480
Author: Xiangrui Meng <m...@databricks.com>
Authored: Wed Aug 29 07:22:03 2018 -0700
Committer: Xiangrui Meng <m...@databricks.com>
Committed: Wed Aug 29 07:22:03 2018 -0700

----------------------------------------------------------------------
 python/pyspark/__init__.py    | 12 +++++++++---
 python/pyspark/rdd.py         | 22 ++++++++++++++++++----
 python/pyspark/taskcontext.py | 26 +++++++++++++++++---------
 3 files changed, 44 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyspark/__init__.py b/python/pyspark/__init__.py
index 5821891..ee153af 100644
--- a/python/pyspark/__init__.py
+++ b/python/pyspark/__init__.py
@@ -36,7 +36,12 @@ Public classes:
       Finer-grained cache persistence levels.
   - :class:`TaskContext`:
       Information about the current running task, available on the workers and 
experimental.
-
+  - :class:`RDDBarrier`:
+      Wraps an RDD under a barrier stage for barrier execution.
+  - :class:`BarrierTaskContext`:
+      A :class:`TaskContext` that provides extra info and tooling for barrier 
execution.
+  - :class:`BarrierTaskInfo`:
+      Information about a barrier task.
 """
 
 from functools import wraps
@@ -44,14 +49,14 @@ import types
 
 from pyspark.conf import SparkConf
 from pyspark.context import SparkContext
-from pyspark.rdd import RDD
+from pyspark.rdd import RDD, RDDBarrier
 from pyspark.files import SparkFiles
 from pyspark.storagelevel import StorageLevel
 from pyspark.accumulators import Accumulator, AccumulatorParam
 from pyspark.broadcast import Broadcast
 from pyspark.serializers import MarshalSerializer, PickleSerializer
 from pyspark.status import *
-from pyspark.taskcontext import TaskContext
+from pyspark.taskcontext import TaskContext, BarrierTaskContext, 
BarrierTaskInfo
 from pyspark.profiler import Profiler, BasicProfiler
 from pyspark.version import __version__
 from pyspark._globals import _NoValue
@@ -113,4 +118,5 @@ __all__ = [
     "SparkConf", "SparkContext", "SparkFiles", "RDD", "StorageLevel", 
"Broadcast",
     "Accumulator", "AccumulatorParam", "MarshalSerializer", "PickleSerializer",
     "StatusTracker", "SparkJobInfo", "SparkStageInfo", "Profiler", 
"BasicProfiler", "TaskContext",
+    "RDDBarrier", "BarrierTaskContext", "BarrierTaskInfo",
 ]

http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/rdd.py
----------------------------------------------------------------------
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 380475e..b317156 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -2390,7 +2390,18 @@ class RDD(object):
         """
         .. note:: Experimental
 
-        Indicates that Spark must launch the tasks together for the current 
stage.
+        Marks the current stage as a barrier stage, where Spark must launch 
all tasks together.
+        In case of a task failure, instead of only restarting the failed task, 
Spark will abort the
+        entire stage and relaunch all tasks for this stage.
+        The barrier execution mode feature is experimental and it only handles 
limited scenarios.
+        Please read the linked SPIP and design docs to understand the 
limitations and future plans.
+
+        :return: an :class:`RDDBarrier` instance that provides actions within 
a barrier stage.
+
+        .. seealso:: :class:`BarrierTaskContext`
+        .. seealso:: `SPIP: Barrier Execution Mode \
+            <http://jira.apache.org/jira/browse/SPARK-24374>`_
+        .. seealso:: `Design Doc 
<https://jira.apache.org/jira/browse/SPARK-24582>`_
 
         .. versionadded:: 2.4.0
         """
@@ -2430,8 +2441,8 @@ class RDDBarrier(object):
     """
     .. note:: Experimental
 
-    An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to 
launch tasks of the stage
-    contains this RDD together.
+    Wraps an RDD in a barrier stage, which forces Spark to launch tasks of 
this stage together.
+    :class:`RDDBarrier` instances are created by :func:`RDD.barrier`.
 
     .. versionadded:: 2.4.0
     """
@@ -2443,7 +2454,10 @@ class RDDBarrier(object):
         """
         .. note:: Experimental
 
-        Return a new RDD by applying a function to each partition of this RDD.
+        Returns a new RDD by applying a function to each partition of the 
wrapped RDD,
+        where tasks are launched together in a barrier stage.
+        The interface is the same as :func:`RDD.mapPartitions`.
+        Please see the API doc there.
 
         .. versionadded:: 2.4.0
         """

http://git-wip-us.apache.org/repos/asf/spark/blob/20b7c684/python/pyspark/taskcontext.py
----------------------------------------------------------------------
diff --git a/python/pyspark/taskcontext.py b/python/pyspark/taskcontext.py
index 53fc2b2..b61643e 100644
--- a/python/pyspark/taskcontext.py
+++ b/python/pyspark/taskcontext.py
@@ -131,9 +131,8 @@ class BarrierTaskContext(TaskContext):
     """
     .. note:: Experimental
 
-    A TaskContext with extra info and tooling for a barrier stage. To access 
the BarrierTaskContext
-    for a running task, use:
-    L{BarrierTaskContext.get()}.
+    A :class:`TaskContext` with extra contextual info and tooling for tasks in 
a barrier stage.
+    Use :func:`BarrierTaskContext.get` to obtain the barrier context for a 
running barrier task.
 
     .. versionadded:: 2.4.0
     """
@@ -155,8 +154,11 @@ class BarrierTaskContext(TaskContext):
     @classmethod
     def get(cls):
         """
-        Return the currently active BarrierTaskContext. This can be called 
inside of user functions
-        to access contextual information about running tasks.
+        .. note:: Experimental
+
+        Return the currently active :class:`BarrierTaskContext`.
+        This can be called inside of user functions to access contextual 
information about
+        running tasks.
 
         .. note:: Must be called on the worker, not the driver. Returns None 
if not initialized.
         """
@@ -176,7 +178,12 @@ class BarrierTaskContext(TaskContext):
         .. note:: Experimental
 
         Sets a global barrier and waits until all tasks in this stage hit this 
barrier.
-        Note this method is only allowed for a BarrierTaskContext.
+        Similar to `MPI_Barrier` function in MPI, this function blocks until 
all tasks
+        in the same stage have reached this routine.
+
+        .. warning:: In a barrier stage, each task much have the same number 
of `barrier()`
+            calls, in all possible code branches.
+            Otherwise, you may get the job hanging or a SparkException after 
timeout.
 
         .. versionadded:: 2.4.0
         """
@@ -190,9 +197,8 @@ class BarrierTaskContext(TaskContext):
         """
         .. note:: Experimental
 
-        Returns the all task infos in this barrier stage, the task infos are 
ordered by
-        partitionId.
-        Note this method is only allowed for a BarrierTaskContext.
+        Returns :class:`BarrierTaskInfo` for all tasks in this barrier stage,
+        ordered by partition ID.
 
         .. versionadded:: 2.4.0
         """
@@ -210,6 +216,8 @@ class BarrierTaskInfo(object):
 
     Carries all task infos of a barrier task.
 
+    :var address: The IPv4 address (host:port) of the executor that the 
barrier task is running on
+
     .. versionadded:: 2.4.0
     """
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to