Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/22011#discussion_r208068705
--- Diff: python/pyspark/rdd.py ---
@@ -2429,6 +2441,29 @@ def _wrap_function(sc, func, deserializer,
serializer, profiler=None):
sc.pythonVer, broadcast_vars,
sc._javaAccumulator)
+class RDDBarrier(object):
+
+ """
+ .. note:: Experimental
+
+ An RDDBarrier turns an RDD into a barrier RDD, which forces Spark to
launch tasks of the stage
+ contains this RDD together.
+ """
+
+ def __init__(self, rdd):
+ self.rdd = rdd
+ self._jrdd = rdd._jrdd
+
+ def mapPartitions(self, f, preservesPartitioning=False):
--- End diff --
If we expose a package private method to get the annotated RDD with
`isBarrier=True` in `RDDBarrier`, we can implement `mapPartitions` easily here:
~~~python
jBarrierRdd = self._jrdd.rdd.barrier().barrierRdd.javaRdd
pyBarrierRdd = RDD(self._jrdd.rdd.barrier().barrierRdd.javaRdd)
pyBarrierRdd.mapPartitions(f, preservesPartitioning)
~~~
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]