vinodkc commented on code in PR #54237:
URL: https://github.com/apache/spark/pull/54237#discussion_r2789916465


##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
         # We do not consider providing different read limit on simple stream 
reader.
         return ReadAllAvailable()
 
+    def _validate_read_result(self, start: dict, end: dict, it: 
Iterator[Tuple]) -> None:
+        """
+        Validates that read() did not return a non-empty batch with end equal 
to start,
+        which would cause the same batch to be processed repeatedly. When end 
!= start,
+        appends the result to the cache; when end == start with empty 
iterator, does not
+        cache (avoids unbounded cache growth).
+        """
+        if json.dumps(end) != json.dumps(start):

Review Comment:
   I followed similar json.dumps comparison in `commit` and `getCache` . 
   Now, to pass the offsets to Exception message, we need to convert dict to 
json using json.dumps



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to