ueshin commented on a change in pull request #34238:
URL: https://github.com/apache/spark/pull/34238#discussion_r734034297



##########
File path: python/pyspark/context.py
##########
@@ -255,7 +298,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles, 
environment, batchSize,
 
         # Deploy code dependencies set by spark-submit; these will already 
have been added
         # with SparkContext.addFile, so we just need to add them to the 
PYTHONPATH
-        for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+        for path in self._conf.get(
+                "spark.submit.pyFiles", "").split(","):  # type: 
ignore[union-attr]

Review comment:
       Do we need the ignore here?

##########
File path: python/pyspark/context.py
##########
@@ -206,26 +239,36 @@ def _do_init(self, master, appName, sparkHome, pyFiles, 
environment, batchSize,
         self.environment["PYTHONHASHSEED"] = os.environ.get("PYTHONHASHSEED", 
"0")
 
         # Create the Java SparkContext through Py4J
-        self._jsc = jsc or self._initialize_context(self._conf._jconf)
+        self._jsc: JavaObject = jsc or 
self._initialize_context(self._conf._jconf)
         # Reset the SparkConf to the one actually used by the SparkContext in 
JVM.
-        self._conf = SparkConf(_jconf=self._jsc.sc().conf())
+        self._conf = SparkConf(_jconf=self._jsc.sc().conf())  # type: 
ignore[union-attr]

Review comment:
       Do we need the ignore here?

##########
File path: python/pyspark/files.py
##########
@@ -30,29 +35,30 @@ class SparkFiles(object):
     instances.
     """
 
-    _root_directory = None
-    _is_running_on_worker = False
-    _sc = None
+    _root_directory: Optional[str] = None
+    _is_running_on_worker: bool = False
+    _sc: Optional["SparkContext"] = None

Review comment:
       Those are `ClassVar`?

##########
File path: python/pyspark/context.py
##########
@@ -565,16 +633,24 @@ def f(split, iterator):
         batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
         serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
 
-        def reader_func(temp_filename):
-            return self._jvm.PythonRDD.readRDDFromFile(self._jsc, 
temp_filename, numSlices)
+        def reader_func(temp_filename: str) -> JavaObject:
+            return self._jvm.PythonRDD.readRDDFromFile(  # type: 
ignore[union-attr]
+                self._jsc, temp_filename, numSlices)
 
-        def createRDDServer():
-            return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
+        def createRDDServer() -> JavaObject:
+            return self._jvm.PythonParallelizeServer(  # type: 
ignore[union-attr]
+                self._jsc.sc(), numSlices)
 
         jrdd = self._serialize_to_jvm(c, serializer, reader_func, 
createRDDServer)
         return RDD(jrdd, self, serializer)
 
-    def _serialize_to_jvm(self, data, serializer, reader_func, 
createRDDServer):
+    def _serialize_to_jvm(
+        self,
+        data: object,

Review comment:
       `Any`?

##########
File path: python/pyspark/context.py
##########
@@ -959,37 +1084,42 @@ def union(self, rdds):
         >>> sorted(sc.union([textFile, parallelized]).collect())
         ['Hello', 'World!']
         """
-        first_jrdd_deserializer = rdds[0]._jrdd_deserializer
-        if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds):
-            rdds = [x._reserialize() for x in rdds]
+        first_jrdd_deserializer = rdds[0]._jrdd_deserializer  # type: 
ignore[index]
+        if any(
+            x._jrdd_deserializer != first_jrdd_deserializer  # type: 
ignore[attr-defined]
+            for x in rdds
+        ):
+            rdds = [x._reserialize() for x in rdds]  # type: 
ignore[attr-defined]
         gw = SparkContext._gateway
         jvm = SparkContext._jvm
-        jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD
-        jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD
-        jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD
-        if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls):
+        jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD  # type: 
ignore[union-attr]
+        jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD  # type: 
ignore[union-attr]
+        jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD  # type: 
ignore[union-attr]
+        if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls):  # type: ignore[index]
             cls = jrdd_cls
-        elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls):
+        elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls):  # type: 
ignore[index]
             cls = jpair_rdd_cls
-        elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls):
+        elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls):  # type: 
ignore[index]
             cls = jdouble_rdd_cls
         else:
-            cls_name = rdds[0]._jrdd.getClass().getCanonicalName()
+            cls_name = rdds[0]._jrdd.getClass().getCanonicalName()  # type: 
ignore[index]
             raise TypeError("Unsupported Java RDD class %s" % cls_name)
-        jrdds = gw.new_array(cls, len(rdds))
-        for i in range(0, len(rdds)):
-            jrdds[i] = rdds[i]._jrdd
-        return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)
+        jrdds = gw.new_array(cls, len(rdds))  # type: ignore[arg-type, 
union-attr]
+        for i in range(0, len(rdds)):  # type: ignore[arg-type]
+            jrdds[i] = rdds[i]._jrdd  # type: ignore[index]
+        return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)  
# type: ignore[index]
 
-    def broadcast(self, value):
+    def broadcast(self, value: T) -> "Broadcast[T]":
         """
         Broadcast a read-only variable to the cluster, returning a 
:class:`Broadcast`
         object for reading it in distributed functions. The variable will
         be sent to each cluster only once.
         """
         return Broadcast(self, value, self._pickled_broadcast_vars)
 
-    def accumulator(self, value, accum_param=None):
+    def accumulator(
+        self, value: T, accum_param: "Optional[AccumulatorParam[T]]" = None

Review comment:
       `Optional["AccumulatorParam[T]"]`

##########
File path: python/pyspark/context.py
##########
@@ -118,20 +125,36 @@ class SparkContext(object):
     ValueError: ...
     """
 
-    _gateway = None
-    _jvm = None
-    _next_accum_id = 0
-    _active_spark_context = None
+    _gateway: Optional[JavaGateway] = None
+    _jvm: Optional[JavaObject] = None
+    _next_accum_id: int = 0
+    _active_spark_context: Optional["SparkContext"] = None
     _lock = RLock()

Review comment:
       Those should be `ClassVar`?

##########
File path: python/pyspark/context.py
##########
@@ -930,16 +1055,16 @@ def hadoopRDD(self, inputFormatClass, keyClass, 
valueClass, keyConverter=None,
             Java object. (default 0, choose batchSize automatically)
         """
         jconf = self._dictToJavaMap(conf)
-        jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass, 
keyClass,
-                                             valueClass, keyConverter, 
valueConverter,
-                                             jconf, batchSize)
+        jrdd = self._jvm.PythonRDD.hadoopRDD(  # type: ignore[union-attr]
+            self._jsc, inputFormatClass, keyClass, valueClass, keyConverter, 
valueConverter, jconf,
+            batchSize)
         return RDD(jrdd, self)
 
-    def _checkpointFile(self, name, input_deserializer):
+    def _checkpointFile(self, name: str, input_deserializer: Serializer) -> 
RDD:
         jrdd = self._jsc.checkpointFile(name)
         return RDD(jrdd, self, input_deserializer)
 
-    def union(self, rdds):
+    def union(self, rdds: "Iterable[RDD[T]]") -> "RDD[T]":

Review comment:
       `Iterable["RDD[T]"]`?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to