This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new edbc7cf [SPARK-36533][SS][FOLLOWUP] Support Trigger.AvailableNow in PySpark edbc7cf is described below commit edbc7cf9e00233b35c057c357bf1c6b99f2ba59b Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Mon Nov 15 08:59:07 2021 +0900 [SPARK-36533][SS][FOLLOWUP] Support Trigger.AvailableNow in PySpark ### What changes were proposed in this pull request? This PR proposes to add Trigger.AvailableNow in PySpark on top of #33763. ### Why are the changes needed? We missed adding Trigger.AvailableNow in PySpark in #33763. ### Does this PR introduce _any_ user-facing change? Yes, Trigger.AvailableNow will be available in PySpark as well. ### How was this patch tested? Added simple validation in PySpark doc. Manually tested as below: ``` >>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(once=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dff6d0> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| | c| | d| | e| +-----+ >>> spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dffe50> >>> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| | c| | d| | e| +-----+ >>> spark.readStream.format("text").option("maxfilespertrigger", "2").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start() <pyspark.sql.streaming.StreamingQuery object at 0x118dff820> >>> ------------------------------------------- Batch: 0 ------------------------------------------- +-----+ |value| +-----+ | a| | b| +-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +-----+ |value| +-----+ | c| | d| +-----+ ------------------------------------------- Batch: 2 ------------------------------------------- +-----+ |value| +-----+ | e| +-----+ >>> ``` Closes #34592 from HeartSaVioR/SPARK-36533-FOLLOWUP-pyspark. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- python/pyspark/sql/streaming.py | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py index b2d06f2..53a098c 100644 --- a/python/pyspark/sql/streaming.py +++ b/python/pyspark/sql/streaming.py @@ -1005,12 +1005,17 @@ class DataStreamWriter(object): def trigger(self, *, continuous: str) -> "DataStreamWriter": ... + @overload + def trigger(self, *, availableNow: bool) -> "DataStreamWriter": + ... + def trigger( self, *, processingTime: Optional[str] = None, once: Optional[bool] = None, continuous: Optional[str] = None, + availableNow: Optional[bool] = None, ) -> "DataStreamWriter": """Set the trigger for the stream query. If this is not set it will run the query as fast as possible, which is equivalent to setting the trigger to ``processingTime='0 seconds'``. @@ -1030,6 +1035,9 @@ class DataStreamWriter(object): a time interval as a string, e.g. '5 seconds', '1 minute'. Set a trigger that runs a continuous query with a given checkpoint interval. Only one trigger can be set. + availableNow : bool, optional + if set to True, set a trigger that processes all available data in multiple + batches then terminates the query. Only one trigger can be set. Notes ----- @@ -1043,12 +1051,14 @@ class DataStreamWriter(object): >>> writer = sdf.writeStream.trigger(once=True) >>> # trigger the query for execution every 5 seconds >>> writer = sdf.writeStream.trigger(continuous='5 seconds') + >>> # trigger the query for reading all available data with multiple batches + >>> writer = sdf.writeStream.trigger(availableNow=True) """ - params = [processingTime, once, continuous] + params = [processingTime, once, continuous, availableNow] - if params.count(None) == 3: + if params.count(None) == 4: raise ValueError("No trigger provided") - elif params.count(None) < 2: + elif params.count(None) < 3: raise ValueError("Multiple triggers not allowed.") jTrigger = None @@ -1069,7 +1079,7 @@ class DataStreamWriter(object): self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once() # type: ignore[attr-defined] ) - else: + elif continuous is not None: if type(continuous) != str or len(continuous.strip()) == 0: raise ValueError( "Value for continuous must be a non empty string. Got: %s" % continuous @@ -1078,6 +1088,12 @@ class DataStreamWriter(object): jTrigger = self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous( # type: ignore[attr-defined] interval ) + else: + if availableNow is not True: + raise ValueError("Value for availableNow must be True. Got: %s" % availableNow) + jTrigger = ( + self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.AvailableNow() # type: ignore[attr-defined] + ) self._jwrite = self._jwrite.trigger(jTrigger) return self --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org