HeartSaVioR commented on code in PR #54085:
URL: https://github.com/apache/spark/pull/54085#discussion_r2771544709
##########
python/pyspark/sql/streaming/python_streaming_source_runner.py:
##########
@@ -116,6 +138,77 @@ def send_batch_func(
write_int(EMPTY_PYARROW_RECORD_BATCHES, outfile)
+def check_support_func(reader: DataSourceStreamReader, outfile: IO) -> None:
+ support_flags = 0
+ if isinstance(reader, _SimpleStreamReaderWrapper):
Review Comment:
I guess I agree with the general concern - though there is a tricky issue I
had to deal with.
Probably you are not aware of the logic on Scala side so don't have a
sufficient context.
There is a sequence 1) determine whether the data source supports
trigger.availableNow (because in Scala side we have to use different
implementations extending the interface) 2) call prepareForTriggerAvailableNow
only when the data source reports that it supports trigger.availableNow.
Making prepareForTriggerAvailableNow that raises NotImplementedError does
not work since we don't want to call this method twice just because of the
implementation detail on the engine. This might have side effect on source
implementations unless we enforce them to be idempotent.
Introducing supportTriggerAvailableNow directly into DataSourceStreamReader
might be a viable alternative, but it's a clear divergence in terms of usage on
interface between Scala vs Python - making two in a sync has been an implicit
direction of API for python data source.
(latestOffset is an exception due to the backward compatibility issue I
mentioned above, and arguably, admission control should be implemented by
default. Old latestOffset signature makes no sense and I regret I hadn't pushed
this in initial design of streaming reader.)
Not sure we'd love to do this just because we don't want to perform manual
pattern matching. That'd be arguable and may not be something two of us just
have a discuss and decide.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]