vinodkc commented on code in PR #54237:
URL: https://github.com/apache/spark/pull/54237#discussion_r2789899658


##########
python/pyspark/errors/error-conditions.json:
##########
@@ -1210,6 +1210,11 @@
       "Cannot serialize the function `<name>`. If you accessed the Spark 
session, or a DataFrame defined outside of the function, or any object that 
contains a Spark session, please be aware that they are not allowed in Spark 
Connect. For `foreachBatch`, please access the Spark session using 
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch` 
function. For `StreamingQueryListener`, please access the Spark session using 
`self.spark`. For details please check out the PySpark doc for `foreachBatch` 
and `StreamingQueryListener`."
     ]
   },
+  "STREAM_READER_OFFSET_DID_NOT_ADVANCE": {

Review Comment:
   Done



##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
         # We do not consider providing different read limit on simple stream 
reader.
         return ReadAllAvailable()
 
+    def _validate_read_result(self, start: dict, end: dict, it: 
Iterator[Tuple]) -> None:
+        """
+        Validates that read() did not return a non-empty batch with end equal 
to start,
+        which would cause the same batch to be processed repeatedly. When end 
!= start,
+        appends the result to the cache; when end == start with empty 
iterator, does not
+        cache (avoids unbounded cache growth).
+        """
+        if json.dumps(end) != json.dumps(start):
+            self.cache.append(PrefetchedCacheEntry(start, end, it))
+            return
+        try:
+            next(it)
+        except StopIteration:
+            return
+        raise PySparkException(
+            errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+            messageParameters={},
+        )
+
     def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
         assert start is not None, "start offset should not be None"
         assert isinstance(
             limit, ReadAllAvailable
         ), "simple stream reader does not support read limit"
 
         (iter, end) = self.simple_reader.read(start)
-        self.cache.append(PrefetchedCacheEntry(start, end, iter))
+        self._validate_read_result(start, end, iter)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to