vinodkc commented on code in PR #54237:
URL: https://github.com/apache/spark/pull/54237#discussion_r2789921645
##########
python/pyspark/errors/error-conditions.json:
##########
@@ -1210,6 +1210,11 @@
"Cannot serialize the function `<name>`. If you accessed the Spark
session, or a DataFrame defined outside of the function, or any object that
contains a Spark session, please be aware that they are not allowed in Spark
Connect. For `foreachBatch`, please access the Spark session using
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch`
function. For `StreamingQueryListener`, please access the Spark session using
`self.spark`. For details please check out the PySpark doc for `foreachBatch`
and `StreamingQueryListener`."
]
},
+ "STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+ "message": [
+ "Stream reader read() returned a non-empty batch but the end offset did
not advance past the start offset. Returning end equal to start with data would
cause the same batch to be processed repeatedly. The end offset must represent
the position after the last record returned."
Review Comment:
Done
##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
# We do not consider providing different read limit on simple stream
reader.
return ReadAllAvailable()
+ def _validate_read_result(self, start: dict, end: dict, it:
Iterator[Tuple]) -> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ if json.dumps(end) != json.dumps(start):
+ self.cache.append(PrefetchedCacheEntry(start, end, it))
+ return
+ try:
+ next(it)
+ except StopIteration:
+ return
+ raise PySparkException(
+ errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ messageParameters={},
Review Comment:
Done
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]