gaogaotiantian commented on code in PR #54085:
URL: https://github.com/apache/spark/pull/54085#discussion_r2767511348
##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -116,6 +138,77 @@ def send_batch_func(
write_int(EMPTY_PYARROW_RECORD_BATCHES, outfile)
+def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None:
+ support_flags = 0
+ if isinstance(reader, _SimpleStreamReaderWrapper):
Review Comment:
I'm surprised that we wrote `_SimpleStreamReaderWrapper` which is a subclass
of `DataSourceStreamReader` yet we still need to have a separate if statement
for it - that to be is against the rule of inheritance...
However, in this case, do we really need to?
The same logic for `inspect.signature` applies for
`_SimpleStreamReaderWrapper` because it does have the correct signature for
`reader.latestOffset`. `SupportsTriggerAvailableNow` is not the important thing
the important thing should be `prepareForTriggerAvailableNow`. We should have a
read-through logic for `_SimpleStreamReaderWrapper` so `hasattr(reader,
"prepareForTriggerAvailableNow")` returns the underlying `simple_reader`'s
attributes directly.
Maybe we don't need to do this in this PR, but we should not claim that
`_SimpleStreamReaderWrapper` is a `DataSourceStreamReader` and still need to
have a separate case to access `simple_reader` all the time.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]