This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new edbc7cf  [SPARK-36533][SS][FOLLOWUP] Support Trigger.AvailableNow in 
PySpark
edbc7cf is described below

commit edbc7cf9e00233b35c057c357bf1c6b99f2ba59b
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Mon Nov 15 08:59:07 2021 +0900

    [SPARK-36533][SS][FOLLOWUP] Support Trigger.AvailableNow in PySpark
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to add Trigger.AvailableNow in PySpark on top of #33763.
    
    ### Why are the changes needed?
    
    We missed adding Trigger.AvailableNow in PySpark in #33763.
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, Trigger.AvailableNow will be available in PySpark as well.
    
    ### How was this patch tested?
    
    Added simple validation in PySpark doc. Manually tested as below:
    
    ```
    >>> 
spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(once=True).start()
    <pyspark.sql.streaming.StreamingQuery object at 0x118dff6d0>
    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |    a|
    |    b|
    |    c|
    |    d|
    |    e|
    +-----+
    
    >>> 
spark.readStream.format("text").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
    <pyspark.sql.streaming.StreamingQuery object at 0x118dffe50>
    >>> -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |    a|
    |    b|
    |    c|
    |    d|
    |    e|
    +-----+
    
    >>> spark.readStream.format("text").option("maxfilespertrigger", 
"2").load("/WorkArea/ScalaProjects/spark-apache/dist/inputs").writeStream.format("console").trigger(availableNow=True).start()
    <pyspark.sql.streaming.StreamingQuery object at 0x118dff820>
    >>> -------------------------------------------
    Batch: 0
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |    a|
    |    b|
    +-----+
    
    -------------------------------------------
    Batch: 1
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |    c|
    |    d|
    +-----+
    
    -------------------------------------------
    Batch: 2
    -------------------------------------------
    +-----+
    |value|
    +-----+
    |    e|
    +-----+
    
    >>>
    ```
    
    Closes #34592 from HeartSaVioR/SPARK-36533-FOLLOWUP-pyspark.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 python/pyspark/sql/streaming.py | 24 ++++++++++++++++++++----
 1 file changed, 20 insertions(+), 4 deletions(-)

diff --git a/python/pyspark/sql/streaming.py b/python/pyspark/sql/streaming.py
index b2d06f2..53a098c 100644
--- a/python/pyspark/sql/streaming.py
+++ b/python/pyspark/sql/streaming.py
@@ -1005,12 +1005,17 @@ class DataStreamWriter(object):
     def trigger(self, *, continuous: str) -> "DataStreamWriter":
         ...
 
+    @overload
+    def trigger(self, *, availableNow: bool) -> "DataStreamWriter":
+        ...
+
     def trigger(
         self,
         *,
         processingTime: Optional[str] = None,
         once: Optional[bool] = None,
         continuous: Optional[str] = None,
+        availableNow: Optional[bool] = None,
     ) -> "DataStreamWriter":
         """Set the trigger for the stream query. If this is not set it will 
run the query as fast
         as possible, which is equivalent to setting the trigger to 
``processingTime='0 seconds'``.
@@ -1030,6 +1035,9 @@ class DataStreamWriter(object):
             a time interval as a string, e.g. '5 seconds', '1 minute'.
             Set a trigger that runs a continuous query with a given checkpoint
             interval. Only one trigger can be set.
+        availableNow : bool, optional
+            if set to True, set a trigger that processes all available data in 
multiple
+            batches then terminates the query. Only one trigger can be set.
 
         Notes
         -----
@@ -1043,12 +1051,14 @@ class DataStreamWriter(object):
         >>> writer = sdf.writeStream.trigger(once=True)
         >>> # trigger the query for execution every 5 seconds
         >>> writer = sdf.writeStream.trigger(continuous='5 seconds')
+        >>> # trigger the query for reading all available data with multiple 
batches
+        >>> writer = sdf.writeStream.trigger(availableNow=True)
         """
-        params = [processingTime, once, continuous]
+        params = [processingTime, once, continuous, availableNow]
 
-        if params.count(None) == 3:
+        if params.count(None) == 4:
             raise ValueError("No trigger provided")
-        elif params.count(None) < 2:
+        elif params.count(None) < 3:
             raise ValueError("Multiple triggers not allowed.")
 
         jTrigger = None
@@ -1069,7 +1079,7 @@ class DataStreamWriter(object):
                 
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Once()  # type: 
ignore[attr-defined]
             )
 
-        else:
+        elif continuous is not None:
             if type(continuous) != str or len(continuous.strip()) == 0:
                 raise ValueError(
                     "Value for continuous must be a non empty string. Got: %s" 
% continuous
@@ -1078,6 +1088,12 @@ class DataStreamWriter(object):
             jTrigger = 
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.Continuous(  # 
type: ignore[attr-defined]
                 interval
             )
+        else:
+            if availableNow is not True:
+                raise ValueError("Value for availableNow must be True. Got: 
%s" % availableNow)
+            jTrigger = (
+                
self._spark._sc._jvm.org.apache.spark.sql.streaming.Trigger.AvailableNow()  # 
type: ignore[attr-defined]
+            )
 
         self._jwrite = self._jwrite.trigger(jTrigger)
         return self

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to