itholic commented on a change in pull request #34293:
URL: https://github.com/apache/spark/pull/34293#discussion_r787298665



##########
File path: python/pyspark/streaming/context.py
##########
@@ -264,7 +280,9 @@ def checkpoint(self, directory):
         """
         self._jssc.checkpoint(directory)
 
-    def socketTextStream(self, hostname, port, 
storageLevel=StorageLevel.MEMORY_AND_DISK_2):
+    def socketTextStream(
+        self, hostname: str, port: int, storageLevel: StorageLevel = 
StorageLevel.MEMORY_AND_DISK_2
+    ) -> "DStream[str]":

Review comment:
       I think we can directly use `DStream[str]` here.
   
   `"DStream[str]"` -> `DStream[str]` should be fine.

##########
File path: python/pyspark/streaming/context.py
##########
@@ -279,12 +297,12 @@ def socketTextStream(self, hostname, port, 
storageLevel=StorageLevel.MEMORY_AND_
         storageLevel : :class:`pyspark.StorageLevel`, optional
             Storage level to use for storing the received objects
         """
-        jlevel = self._sc._getJavaStorageLevel(storageLevel)
+        jlevel = self._sc._getJavaStorageLevel(storageLevel)  # type: 
ignore[attr-defined]
         return DStream(
             self._jssc.socketTextStream(hostname, port, jlevel), self, 
UTF8Deserializer()
         )
 
-    def textFileStream(self, directory):
+    def textFileStream(self, directory: str) -> "DStream[str]":

Review comment:
       ditto

##########
File path: python/pyspark/streaming/context.py
##########
@@ -313,14 +331,19 @@ def binaryRecordsStream(self, directory, recordLength):
             self._jssc.binaryRecordsStream(directory, recordLength), self, 
NoOpSerializer()
         )
 
-    def _check_serializers(self, rdds):
+    def _check_serializers(self, rdds: List["RDD[T]"]) -> None:

Review comment:
       Here, too.
   
   `List["RDD[T]"]` -> `List[RDD[T]]` should be fine.

##########
File path: python/pyspark/streaming/context.py
##########
@@ -313,14 +331,19 @@ def binaryRecordsStream(self, directory, recordLength):
             self._jssc.binaryRecordsStream(directory, recordLength), self, 
NoOpSerializer()
         )
 
-    def _check_serializers(self, rdds):
+    def _check_serializers(self, rdds: List["RDD[T]"]) -> None:
         # make sure they have same serializer
-        if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:
+        if len(set(rdd._jrdd_deserializer for rdd in rdds)) > 1:  # type: 
ignore[attr-defined]
             for i in range(len(rdds)):
                 # reset them to sc.serializer
-                rdds[i] = rdds[i]._reserialize()
+                rdds[i] = rdds[i]._reserialize()  # type: ignore[attr-defined]
 
-    def queueStream(self, rdds, oneAtATime=True, default=None):
+    def queueStream(
+        self,
+        rdds: List["RDD[T]"],
+        oneAtATime: bool = True,
+        default: Optional["RDD[T]"] = None,
+    ) -> "DStream[T]":

Review comment:
       ditto

##########
File path: python/pyspark/streaming/context.py
##########
@@ -294,7 +312,7 @@ def textFileStream(self, directory):
         """
         return DStream(self._jssc.textFileStream(directory), self, 
UTF8Deserializer())
 
-    def binaryRecordsStream(self, directory, recordLength):
+    def binaryRecordsStream(self, directory: str, recordLength: int) -> 
"DStream[bytes]":

Review comment:
       ditto

##########
File path: python/pyspark/streaming/context.py
##########
@@ -339,42 +362,48 @@ def queueStream(self, rdds, oneAtATime=True, 
default=None):
         Changes to the queue after the stream is created will not be 
recognized.
         """
         if default and not isinstance(default, RDD):
-            default = self._sc.parallelize(default)
+            default = self._sc.parallelize(default)  # type: ignore[arg-type]
 
         if not rdds and default:
-            rdds = [rdds]
+            rdds = [rdds]  # type: ignore[list-item]
 
         if rdds and not isinstance(rdds[0], RDD):
-            rdds = [self._sc.parallelize(input) for input in rdds]
+            rdds = [self._sc.parallelize(input) for input in rdds]  # type: 
ignore[arg-type]
         self._check_serializers(rdds)
 
-        queue = self._jvm.PythonDStream.toRDDQueue([r._jrdd for r in rdds])
+        queue = self._jvm.PythonDStream.toRDDQueue(
+            [r._jrdd for r in rdds]  # type: ignore[attr-defined]
+        )
         if default:
-            default = default._reserialize(rdds[0]._jrdd_deserializer)
-            jdstream = self._jssc.queueStream(queue, oneAtATime, default._jrdd)
+            default = default._reserialize(rdds[0]._jrdd_deserializer)  # 
type: ignore[attr-defined]
+            jdstream = self._jssc.queueStream(
+                queue, oneAtATime, default._jrdd  # type: ignore[union-attr, 
attr-defined]
+            )
         else:
             jdstream = self._jssc.queueStream(queue, oneAtATime)
-        return DStream(jdstream, self, rdds[0]._jrdd_deserializer)
+        return DStream(jdstream, self, rdds[0]._jrdd_deserializer)  # type: 
ignore[attr-defined]
 
-    def transform(self, dstreams, transformFunc):
+    def transform(
+        self, dstreams: List["DStream[Any]"], transformFunc: Callable[..., 
"RDD[T]"]
+    ) -> "DStream[T]":
         """
         Create a new DStream in which each RDD is generated by applying
         a function on RDDs of the DStreams. The order of the JavaRDDs in
         the transform function parameter will be the same as the order
         of corresponding DStreams in the list.
         """
-        jdstreams = [d._jdstream for d in dstreams]
+        jdstreams = [d._jdstream for d in dstreams]  # type: 
ignore[attr-defined]
         # change the final serializer to sc.serializer
         func = TransformFunction(
             self._sc,
             lambda t, *rdds: transformFunc(rdds),
-            *[d._jrdd_deserializer for d in dstreams],
+            *[d._jrdd_deserializer for d in dstreams],  # type: 
ignore[attr-defined]
         )
         jfunc = self._jvm.TransformFunction(func)
         jdstream = self._jssc.transform(jdstreams, jfunc)
-        return DStream(jdstream, self, self._sc.serializer)
+        return DStream(jdstream, self, self._sc.serializer)  # type: 
ignore[attr-defined]
 
-    def union(self, *dstreams):
+    def union(self, *dstreams: "DStream[T]") -> "DStream[T]":

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to