HeartSaVioR commented on code in PR #54237:
URL: https://github.com/apache/spark/pull/54237#discussion_r2786156180
##########
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##########
@@ -509,6 +510,60 @@ def check_batch(df, batch_id):
q.awaitTermination(timeout=30)
self.assertIsNone(q.exception(), "No exception has to be propagated.")
+ def test_simple_stream_reader_offset_did_not_advance_raises(self):
+ """Validate that returning end == start with non-empty data raises
STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Bug: return same offset as end despite returning data
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+ return (it, {"offset": start_idx})
+
+ def readBetweenOffsets(self, start: dict, end: dict):
+ return iter([])
+
+ def commit(self, end: dict):
+ pass
+
+ reader = BuggySimpleStreamReader()
+ wrapper = _SimpleStreamReaderWrapper(reader)
+ with self.assertRaises(PySparkException) as cm:
+ wrapper.latestOffset({"offset": 0}, ReadAllAvailable())
+ self.assertEqual(
+ cm.exception.getCondition(),
+ "STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ )
+
+ def
test_simple_stream_reader_empty_iterator_start_equals_end_allowed(self):
+ """When read() returns end == start with an empty iterator, no
exception and no cache entry."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class EmptyBatchReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Valid: same offset as end but empty iterator (no data)
+ return (iter([]), {"offset": start["offset"]})
Review Comment:
nit: same here
##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
# We do not consider providing different read limit on simple stream
reader.
return ReadAllAvailable()
+ def _validate_read_result(self, start: dict, end: dict, it:
Iterator[Tuple]) -> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ if json.dumps(end) != json.dumps(start):
+ self.cache.append(PrefetchedCacheEntry(start, end, it))
+ return
+ try:
+ next(it)
+ except StopIteration:
+ return
+ raise PySparkException(
+ errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ messageParameters={},
Review Comment:
nit: maybe good to provide the offset information for debugging.
##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
# We do not consider providing different read limit on simple stream
reader.
return ReadAllAvailable()
+ def _validate_read_result(self, start: dict, end: dict, it:
Iterator[Tuple]) -> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ if json.dumps(end) != json.dumps(start):
Review Comment:
Do you intend to dump it to json to compare while both of them are dict?
##########
python/pyspark/errors/error-conditions.json:
##########
@@ -1210,6 +1210,11 @@
"Cannot serialize the function `<name>`. If you accessed the Spark
session, or a DataFrame defined outside of the function, or any object that
contains a Spark session, please be aware that they are not allowed in Spark
Connect. For `foreachBatch`, please access the Spark session using
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch`
function. For `StreamingQueryListener`, please access the Spark session using
`self.spark`. For details please check out the PySpark doc for `foreachBatch`
and `StreamingQueryListener`."
]
},
+ "STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
Review Comment:
nit: shall we clarify "simple" stream reader?
##########
python/docs/source/tutorial/sql/python_data_source.rst:
##########
@@ -309,7 +309,13 @@ This is the same dummy streaming reader that generates 2
rows every batch implem
def read(self, start: dict) -> Tuple[Iterator[Tuple], dict]:
"""
Takes start offset as an input, return an iterator of tuples and
- the start offset of next read.
+ the end offset (start offset for the next read). The end offset
must
+ advance past the start offset when returning data; otherwise Spark
+ raises a validation exception.
+ For example, returning 2 records from start_idx 0 means end should
+ be {"offset": 2} (i.e. start + 2).
+ When there is no data to read, you may return the same offset as
end and
+ start,but you must provide an empty iterator.
Review Comment:
nit: space between `,` and `b`
##########
python/pyspark/sql/datasource_internal.py:
##########
@@ -93,14 +93,33 @@ def getDefaultReadLimit(self) -> ReadLimit:
# We do not consider providing different read limit on simple stream
reader.
return ReadAllAvailable()
+ def _validate_read_result(self, start: dict, end: dict, it:
Iterator[Tuple]) -> None:
+ """
+ Validates that read() did not return a non-empty batch with end equal
to start,
+ which would cause the same batch to be processed repeatedly. When end
!= start,
+ appends the result to the cache; when end == start with empty
iterator, does not
+ cache (avoids unbounded cache growth).
+ """
+ if json.dumps(end) != json.dumps(start):
+ self.cache.append(PrefetchedCacheEntry(start, end, it))
+ return
+ try:
+ next(it)
+ except StopIteration:
+ return
+ raise PySparkException(
+ errorClass="STREAM_READER_OFFSET_DID_NOT_ADVANCE",
+ messageParameters={},
+ )
+
def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
assert start is not None, "start offset should not be None"
assert isinstance(
limit, ReadAllAvailable
), "simple stream reader does not support read limit"
(iter, end) = self.simple_reader.read(start)
- self.cache.append(PrefetchedCacheEntry(start, end, iter))
+ self._validate_read_result(start, end, iter)
Review Comment:
nit: the method name seems to give confusion that we only validate the
result and does not do more thing. Probably keep the method name to be like
`add_result_to_cache`, but simply performs the verification into it?
##########
python/pyspark/sql/tests/test_python_streaming_datasource.py:
##########
@@ -509,6 +510,60 @@ def check_batch(df, batch_id):
q.awaitTermination(timeout=30)
self.assertIsNone(q.exception(), "No exception has to be propagated.")
+ def test_simple_stream_reader_offset_did_not_advance_raises(self):
+ """Validate that returning end == start with non-empty data raises
STREAM_READER_OFFSET_DID_NOT_ADVANCE."""
+ from pyspark.sql.datasource_internal import _SimpleStreamReaderWrapper
+
+ class BuggySimpleStreamReader(SimpleDataSourceStreamReader):
+ def initialOffset(self):
+ return {"offset": 0}
+
+ def read(self, start: dict):
+ # Bug: return same offset as end despite returning data
+ start_idx = start["offset"]
+ it = iter([(i,) for i in range(start_idx, start_idx + 3)])
+ return (it, {"offset": start_idx})
Review Comment:
nit: why not just returning `start`?
##########
python/pyspark/errors/error-conditions.json:
##########
@@ -1210,6 +1210,11 @@
"Cannot serialize the function `<name>`. If you accessed the Spark
session, or a DataFrame defined outside of the function, or any object that
contains a Spark session, please be aware that they are not allowed in Spark
Connect. For `foreachBatch`, please access the Spark session using
`df.sparkSession`, where `df` is the first parameter in your `foreachBatch`
function. For `StreamingQueryListener`, please access the Spark session using
`self.spark`. For details please check out the PySpark doc for `foreachBatch`
and `StreamingQueryListener`."
]
},
+ "STREAM_READER_OFFSET_DID_NOT_ADVANCE": {
+ "message": [
+ "Stream reader read() returned a non-empty batch but the end offset did
not advance past the start offset. Returning end equal to start with data would
cause the same batch to be processed repeatedly. The end offset must represent
the position after the last record returned."
Review Comment:
nit:
> Returning end equal to start with data would cause the same batch to be
processed repeatedly.
I'd just remove it.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]