ueshin commented on a change in pull request #34238:
URL: https://github.com/apache/spark/pull/34238#discussion_r734034297
##########
File path: python/pyspark/context.py
##########
@@ -255,7 +298,8 @@ def _do_init(self, master, appName, sparkHome, pyFiles,
environment, batchSize,
# Deploy code dependencies set by spark-submit; these will already
have been added
# with SparkContext.addFile, so we just need to add them to the
PYTHONPATH
- for path in self._conf.get("spark.submit.pyFiles", "").split(","):
+ for path in self._conf.get(
+ "spark.submit.pyFiles", "").split(","): # type:
ignore[union-attr]
Review comment:
Do we need the ignore here?
##########
File path: python/pyspark/context.py
##########
@@ -206,26 +239,36 @@ def _do_init(self, master, appName, sparkHome, pyFiles,
environment, batchSize,
self.environment["PYTHONHASHSEED"] = os.environ.get("PYTHONHASHSEED",
"0")
# Create the Java SparkContext through Py4J
- self._jsc = jsc or self._initialize_context(self._conf._jconf)
+ self._jsc: JavaObject = jsc or
self._initialize_context(self._conf._jconf)
# Reset the SparkConf to the one actually used by the SparkContext in
JVM.
- self._conf = SparkConf(_jconf=self._jsc.sc().conf())
+ self._conf = SparkConf(_jconf=self._jsc.sc().conf()) # type:
ignore[union-attr]
Review comment:
Do we need the ignore here?
##########
File path: python/pyspark/files.py
##########
@@ -30,29 +35,30 @@ class SparkFiles(object):
instances.
"""
- _root_directory = None
- _is_running_on_worker = False
- _sc = None
+ _root_directory: Optional[str] = None
+ _is_running_on_worker: bool = False
+ _sc: Optional["SparkContext"] = None
Review comment:
Those are `ClassVar`?
##########
File path: python/pyspark/context.py
##########
@@ -565,16 +633,24 @@ def f(split, iterator):
batchSize = max(1, min(len(c) // numSlices, self._batchSize or 1024))
serializer = BatchedSerializer(self._unbatched_serializer, batchSize)
- def reader_func(temp_filename):
- return self._jvm.PythonRDD.readRDDFromFile(self._jsc,
temp_filename, numSlices)
+ def reader_func(temp_filename: str) -> JavaObject:
+ return self._jvm.PythonRDD.readRDDFromFile( # type:
ignore[union-attr]
+ self._jsc, temp_filename, numSlices)
- def createRDDServer():
- return self._jvm.PythonParallelizeServer(self._jsc.sc(), numSlices)
+ def createRDDServer() -> JavaObject:
+ return self._jvm.PythonParallelizeServer( # type:
ignore[union-attr]
+ self._jsc.sc(), numSlices)
jrdd = self._serialize_to_jvm(c, serializer, reader_func,
createRDDServer)
return RDD(jrdd, self, serializer)
- def _serialize_to_jvm(self, data, serializer, reader_func,
createRDDServer):
+ def _serialize_to_jvm(
+ self,
+ data: object,
Review comment:
`Any`?
##########
File path: python/pyspark/context.py
##########
@@ -959,37 +1084,42 @@ def union(self, rdds):
>>> sorted(sc.union([textFile, parallelized]).collect())
['Hello', 'World!']
"""
- first_jrdd_deserializer = rdds[0]._jrdd_deserializer
- if any(x._jrdd_deserializer != first_jrdd_deserializer for x in rdds):
- rdds = [x._reserialize() for x in rdds]
+ first_jrdd_deserializer = rdds[0]._jrdd_deserializer # type:
ignore[index]
+ if any(
+ x._jrdd_deserializer != first_jrdd_deserializer # type:
ignore[attr-defined]
+ for x in rdds
+ ):
+ rdds = [x._reserialize() for x in rdds] # type:
ignore[attr-defined]
gw = SparkContext._gateway
jvm = SparkContext._jvm
- jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD
- jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD
- jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD
- if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls):
+ jrdd_cls = jvm.org.apache.spark.api.java.JavaRDD # type:
ignore[union-attr]
+ jpair_rdd_cls = jvm.org.apache.spark.api.java.JavaPairRDD # type:
ignore[union-attr]
+ jdouble_rdd_cls = jvm.org.apache.spark.api.java.JavaDoubleRDD # type:
ignore[union-attr]
+ if is_instance_of(gw, rdds[0]._jrdd, jrdd_cls): # type: ignore[index]
cls = jrdd_cls
- elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls):
+ elif is_instance_of(gw, rdds[0]._jrdd, jpair_rdd_cls): # type:
ignore[index]
cls = jpair_rdd_cls
- elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls):
+ elif is_instance_of(gw, rdds[0]._jrdd, jdouble_rdd_cls): # type:
ignore[index]
cls = jdouble_rdd_cls
else:
- cls_name = rdds[0]._jrdd.getClass().getCanonicalName()
+ cls_name = rdds[0]._jrdd.getClass().getCanonicalName() # type:
ignore[index]
raise TypeError("Unsupported Java RDD class %s" % cls_name)
- jrdds = gw.new_array(cls, len(rdds))
- for i in range(0, len(rdds)):
- jrdds[i] = rdds[i]._jrdd
- return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)
+ jrdds = gw.new_array(cls, len(rdds)) # type: ignore[arg-type,
union-attr]
+ for i in range(0, len(rdds)): # type: ignore[arg-type]
+ jrdds[i] = rdds[i]._jrdd # type: ignore[index]
+ return RDD(self._jsc.union(jrdds), self, rdds[0]._jrdd_deserializer)
# type: ignore[index]
- def broadcast(self, value):
+ def broadcast(self, value: T) -> "Broadcast[T]":
"""
Broadcast a read-only variable to the cluster, returning a
:class:`Broadcast`
object for reading it in distributed functions. The variable will
be sent to each cluster only once.
"""
return Broadcast(self, value, self._pickled_broadcast_vars)
- def accumulator(self, value, accum_param=None):
+ def accumulator(
+ self, value: T, accum_param: "Optional[AccumulatorParam[T]]" = None
Review comment:
`Optional["AccumulatorParam[T]"]`
##########
File path: python/pyspark/context.py
##########
@@ -118,20 +125,36 @@ class SparkContext(object):
ValueError: ...
"""
- _gateway = None
- _jvm = None
- _next_accum_id = 0
- _active_spark_context = None
+ _gateway: Optional[JavaGateway] = None
+ _jvm: Optional[JavaObject] = None
+ _next_accum_id: int = 0
+ _active_spark_context: Optional["SparkContext"] = None
_lock = RLock()
Review comment:
Those should be `ClassVar`?
##########
File path: python/pyspark/context.py
##########
@@ -930,16 +1055,16 @@ def hadoopRDD(self, inputFormatClass, keyClass,
valueClass, keyConverter=None,
Java object. (default 0, choose batchSize automatically)
"""
jconf = self._dictToJavaMap(conf)
- jrdd = self._jvm.PythonRDD.hadoopRDD(self._jsc, inputFormatClass,
keyClass,
- valueClass, keyConverter,
valueConverter,
- jconf, batchSize)
+ jrdd = self._jvm.PythonRDD.hadoopRDD( # type: ignore[union-attr]
+ self._jsc, inputFormatClass, keyClass, valueClass, keyConverter,
valueConverter, jconf,
+ batchSize)
return RDD(jrdd, self)
- def _checkpointFile(self, name, input_deserializer):
+ def _checkpointFile(self, name: str, input_deserializer: Serializer) ->
RDD:
jrdd = self._jsc.checkpointFile(name)
return RDD(jrdd, self, input_deserializer)
- def union(self, rdds):
+ def union(self, rdds: "Iterable[RDD[T]]") -> "RDD[T]":
Review comment:
`Iterable["RDD[T]"]`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]