HeartSaVioR commented on code in PR #54085:
URL: https://github.com/apache/spark/pull/54085#discussion_r2758940103


##########
python/pyspark/sql/streaming/datasource.py:
##########
@@ -0,0 +1,225 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+
+
+class ReadLimit(ABC):
+    """
+    Specifies limits on how much data to read from a streaming source when
+    determining the latest offset.
+    """
+
+    @classmethod
+    @abstractmethod
+    def type_name(cls) -> str:
+        """
+        The name of this :class:`ReadLimit` type. This is used to register the 
type into registry.
+
+        Returns
+        -------
+        str
+            The name of this :class:`ReadLimit` type.
+        """
+        pass
+
+    @classmethod
+    @abstractmethod
+    def load(cls, params: dict) -> "ReadLimit":
+        """
+        Create an instance of :class:`ReadLimit` from parameters.
+
+        Parameter
+        ---------
+        params : dict
+            The parameters to create the :class:`ReadLimit`. type name isn't 
included.
+
+        Returns
+        -------
+        ReadLimit
+            The created :class:`ReadLimit` instance.
+        """
+        pass
+
+    def dump(self) -> dict:
+        """
+        Method to serialize this :class:`ReadLimit` instance. Implementations 
of :class:`ReadLimit`
+        are expected to not implement this method directly and rather 
implement the
+        :meth:`_dump()` method.
+
+        Returns
+        -------
+        dict
+            A dictionary containing the serialized parameters of this 
:class:`ReadLimit`,
+            including the type name.
+        """
+        params = self._dump()
+        params.update({"type": self.type_name()})
+        return params
+
+    @abstractmethod
+    def _dump(self) -> dict:
+        """
+        Method to serialize this :class:`ReadLimit` instance. Implementations 
of :class:`ReadLimit`
+        are expected to implement this method to handle their specific 
parameters. type name will
+        be handled in the :meth:`dump()` method.
+
+        Returns
+        -------
+        dict
+            A dictionary containing the serialized parameters of this 
:class:`ReadLimit`,
+            excluding the type name.
+        """
+        pass
+
+
+class ReadAllAvailable(ReadLimit):
+    """
+    A :class:`ReadLimit` that indicates to read all available data, regardless 
of the given source
+    options.
+    """
+
+    @classmethod
+    def type_name(cls) -> str:
+        return "ReadAllAvailable"
+
+    @classmethod
+    def load(cls, params: dict) -> "ReadAllAvailable":
+        return ReadAllAvailable()
+
+    def _dump(self) -> dict:
+        return {}
+
+
+class ReadMinRows(ReadLimit):
+    """
+    A :class:`ReadLimit` that indicates to read minimum N rows. If there is 
less than N rows
+    available for read, the source should skip producing a new offset to read 
and wait until more
+    data arrives.
+
+    Note that the semantic does not work properly with Trigger.AvailableNow 
since the source
+    may end up waiting forever for more data to arrive. It is the source's 
responsibility to
+    handle this case properly.
+    """
+
+    def __init__(self, min_rows: int) -> None:
+        self.min_rows = min_rows
+
+    @classmethod
+    def type_name(cls) -> str:
+        return "ReadMinRows"
+
+    @classmethod
+    def load(cls, params: dict) -> "ReadMinRows":
+        return ReadMinRows(params["min_rows"])
+
+    def _dump(self) -> dict:
+        return {"min_rows": self.min_rows}
+
+
+class ReadMaxRows(ReadLimit):
+    """
+    A :class:`ReadLimit` that indicates to read maximum N rows. The source 
should not read more
+    than N rows when determining the latest offset.
+    """
+
+    def __init__(self, max_rows: int) -> None:
+        self.max_rows = max_rows
+
+    @classmethod
+    def type_name(cls) -> str:
+        return "ReadMaxRows"
+
+    @classmethod
+    def load(cls, params: dict) -> "ReadMaxRows":
+        return ReadMaxRows(params["max_rows"])
+
+    def _dump(self) -> dict:
+        return {"max_rows": self.max_rows}
+
+
+class ReadMaxFiles(ReadLimit):
+    """
+    A :class:`ReadLimit` that indicates to read maximum N files. The source 
should not read more
+    than N files when determining the latest offset.
+    """
+
+    def __init__(self, max_files: int) -> None:
+        self.max_files = max_files
+
+    @classmethod
+    def type_name(cls) -> str:
+        return "ReadMaxFiles"
+
+    @classmethod
+    def load(cls, params: dict) -> "ReadMaxFiles":
+        return ReadMaxFiles(params["max_files"])
+
+    def _dump(self) -> dict:
+        return {"max_files": self.max_files}
+
+
+class ReadMaxBytes(ReadLimit):
+    """
+    A :class:`ReadLimit` that indicates to read maximum N bytes. The source 
should not read more
+    than N bytes when determining the latest offset.
+    """
+
+    def __init__(self, max_bytes: int) -> None:
+        self.max_bytes = max_bytes
+
+    @classmethod
+    def type_name(cls) -> str:
+        return "ReadMaxBytes"
+
+    @classmethod
+    def load(cls, params: dict) -> "ReadMaxBytes":
+        return ReadMaxBytes(params["max_bytes"])
+
+    def _dump(self) -> dict:
+        return {"max_bytes": self.max_bytes}
+
+
+class SupportsTriggerAvailableNow(ABC):
+    """
+    A mixin interface for streaming sources that support Trigger.AvailableNow. 
This interface can
+    be added to both :class:`DataSourceStreamReader` and 
:class:`SimpleDataSourceStreamReader`.
+    """
+
+    @abstractmethod
+    def prepareForTriggerAvailableNow(self) -> None:
+        """
+        This will be called at the beginning of streaming queries with 
Trigger.AvailableNow, to let
+        the source record the offset for the current latest data at the time 
(a.k.a the target
+        offset for the query). The source must behave as if there is no new 
data coming in after
+        the target offset, i.e., the source must not return an offset higher 
than the target offset
+        when :meth:`DataSourceStreamReader.latestOffset()` is called.
+
+        The source can extend the semantic of "current latest data" based on 
its own logic, but the
+        extended semantic must not violate the expectation that the source 
will not read any data
+        which is added later than the time this method has called.
+
+        Note that it is the source's responsibility to ensure that calling
+        :meth:`DataSourceStreamReader.latestOffset()` or 
:meth:`SimpleDataSourceStreamReader.read()`
+        after calling this method will eventually reach the target offset, and 
finally returns the
+        same offset as given start parameter, to indicate that there is no 
more data to read. This
+        includes the case where the query is restarted and the source is asked 
to read from the

Review Comment:
   
https://github.com/apache/spark/commit/4575b1da9e1d7c39c3769f2c53b213a5e5a5407d
   
   So here is the scenario - the query reads from Kafka topic. In the first 
run, the topic has 3 partitions. During the downtime of the query, users 
perform repartition of Kafka topic and now the topic has 5 partitions. If there 
was uncommitted batch, the second run of query will get the `start` offset from 
uncommitted batch, which had only 3 partitions. In the meanwhile, 
prepareForTriggerAvailableNow() will identify there are 5 partitions and store 
the offset for 5 partitions. The source is responsible to read further from 3 
partitions and figure out new partitions, and eventually touch the offset 
stored from prepareForTriggerAvailableNow().
   
   The scenario is actually complicated and I might not be able to describe the 
case with ease of understanding. If there is proposal for better wording, I 
appreciate the suggestion!



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,9 +715,37 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
         """
-        Returns the most recent offset available.
+        Returns the most recent offset available given a read limit. The start 
offset can be used
+        to figure out how much new data should be read given the limit.
+
+        The `start` will be provided from the return value of 
:meth:`initialOffset()` for
+        the very first micro-batch, and the offset continues from the last 
micro-batch for the
+        following. The source can return the same offset as start offset if 
there is no data to
+        process.
+
+        :class:`ReadLimit` can be used by the source to limit the amount of 
data returned in this
+        call. The implementation should implement 
:meth:`getDefaultReadLimit()` to provide the
+        proper :class:`ReadLimit` if the source can limit the amount of data 
returned based on the
+        source options.
+
+        The engine can still call :meth:`latestOffset()` with 
:class:`ReadAllAvailable` even if the
+        source produces the different read limit from 
:meth:`getDefaultReadLimit()`, to respect the
+        semantic of trigger. The source must always respect the given 
readLimit provided by the
+        engine; e.g. if the readLimit is :class:`ReadAllAvailable`, the source 
must ignore the read
+        limit configured through options.

Review Comment:
   It's for two cases 1) Trigger.Once (deprecated), 2) fallback of 
Trigger.AvailableNow (when any stream does not support Trigger.AvailableNow and 
Trigger.AvailableNow is requested) - I'm not very sure we would like to 
document these cases, as once we document it's considered as "contract" rather 
than implementation detail.



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,9 +715,37 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
         """
-        Returns the most recent offset available.
+        Returns the most recent offset available given a read limit. The start 
offset can be used
+        to figure out how much new data should be read given the limit.
+
+        The `start` will be provided from the return value of 
:meth:`initialOffset()` for
+        the very first micro-batch, and the offset continues from the last 
micro-batch for the
+        following. The source can return the same offset as start offset if 
there is no data to

Review Comment:
   I meant the parameter. 
   
   "The source can return the `start` parameter as it is, if there is no data 
to process"
   
   ^ Would it be clearer?



##########
python/pyspark/sql/datasource.py:
##########
@@ -714,9 +715,37 @@ def initialOffset(self) -> dict:
             messageParameters={"feature": "initialOffset"},
         )
 
-    def latestOffset(self) -> dict:
+    def latestOffset(self, start: dict, limit: ReadLimit) -> dict:
         """
-        Returns the most recent offset available.
+        Returns the most recent offset available given a read limit. The start 
offset can be used
+        to figure out how much new data should be read given the limit.
+
+        The `start` will be provided from the return value of 
:meth:`initialOffset()` for
+        the very first micro-batch, and the offset continues from the last 
micro-batch for the
+        following. The source can return the same offset as start offset if 
there is no data to

Review Comment:
   I'm slightly not in favor of coupling the contract with the engine's 
behavior, since it could limit ourselves. But the explanation from your comment 
is unlikely to change, so makes sense to me. Thanks for the suggestion.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to