HeartSaVioR commented on code in PR #54085: URL: https://github.com/apache/spark/pull/54085#discussion_r2758940103
########## python/pyspark/sql/streaming/datasource.py: ########## @@ -0,0 +1,225 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from abc import ABC, abstractmethod + + +class ReadLimit(ABC): + """ + Specifies limits on how much data to read from a streaming source when + determining the latest offset. + """ + + @classmethod + @abstractmethod + def type_name(cls) -> str: + """ + The name of this :class:`ReadLimit` type. This is used to register the type into registry. + + Returns + ------- + str + The name of this :class:`ReadLimit` type. + """ + pass + + @classmethod + @abstractmethod + def load(cls, params: dict) -> "ReadLimit": + """ + Create an instance of :class:`ReadLimit` from parameters. + + Parameter + --------- + params : dict + The parameters to create the :class:`ReadLimit`. type name isn't included. + + Returns + ------- + ReadLimit + The created :class:`ReadLimit` instance. + """ + pass + + def dump(self) -> dict: + """ + Method to serialize this :class:`ReadLimit` instance. Implementations of :class:`ReadLimit` + are expected to not implement this method directly and rather implement the + :meth:`_dump()` method. + + Returns + ------- + dict + A dictionary containing the serialized parameters of this :class:`ReadLimit`, + including the type name. + """ + params = self._dump() + params.update({"type": self.type_name()}) + return params + + @abstractmethod + def _dump(self) -> dict: + """ + Method to serialize this :class:`ReadLimit` instance. Implementations of :class:`ReadLimit` + are expected to implement this method to handle their specific parameters. type name will + be handled in the :meth:`dump()` method. + + Returns + ------- + dict + A dictionary containing the serialized parameters of this :class:`ReadLimit`, + excluding the type name. + """ + pass + + +class ReadAllAvailable(ReadLimit): + """ + A :class:`ReadLimit` that indicates to read all available data, regardless of the given source + options. + """ + + @classmethod + def type_name(cls) -> str: + return "ReadAllAvailable" + + @classmethod + def load(cls, params: dict) -> "ReadAllAvailable": + return ReadAllAvailable() + + def _dump(self) -> dict: + return {} + + +class ReadMinRows(ReadLimit): + """ + A :class:`ReadLimit` that indicates to read minimum N rows. If there is less than N rows + available for read, the source should skip producing a new offset to read and wait until more + data arrives. + + Note that the semantic does not work properly with Trigger.AvailableNow since the source + may end up waiting forever for more data to arrive. It is the source's responsibility to + handle this case properly. + """ + + def __init__(self, min_rows: int) -> None: + self.min_rows = min_rows + + @classmethod + def type_name(cls) -> str: + return "ReadMinRows" + + @classmethod + def load(cls, params: dict) -> "ReadMinRows": + return ReadMinRows(params["min_rows"]) + + def _dump(self) -> dict: + return {"min_rows": self.min_rows} + + +class ReadMaxRows(ReadLimit): + """ + A :class:`ReadLimit` that indicates to read maximum N rows. The source should not read more + than N rows when determining the latest offset. + """ + + def __init__(self, max_rows: int) -> None: + self.max_rows = max_rows + + @classmethod + def type_name(cls) -> str: + return "ReadMaxRows" + + @classmethod + def load(cls, params: dict) -> "ReadMaxRows": + return ReadMaxRows(params["max_rows"]) + + def _dump(self) -> dict: + return {"max_rows": self.max_rows} + + +class ReadMaxFiles(ReadLimit): + """ + A :class:`ReadLimit` that indicates to read maximum N files. The source should not read more + than N files when determining the latest offset. + """ + + def __init__(self, max_files: int) -> None: + self.max_files = max_files + + @classmethod + def type_name(cls) -> str: + return "ReadMaxFiles" + + @classmethod + def load(cls, params: dict) -> "ReadMaxFiles": + return ReadMaxFiles(params["max_files"]) + + def _dump(self) -> dict: + return {"max_files": self.max_files} + + +class ReadMaxBytes(ReadLimit): + """ + A :class:`ReadLimit` that indicates to read maximum N bytes. The source should not read more + than N bytes when determining the latest offset. + """ + + def __init__(self, max_bytes: int) -> None: + self.max_bytes = max_bytes + + @classmethod + def type_name(cls) -> str: + return "ReadMaxBytes" + + @classmethod + def load(cls, params: dict) -> "ReadMaxBytes": + return ReadMaxBytes(params["max_bytes"]) + + def _dump(self) -> dict: + return {"max_bytes": self.max_bytes} + + +class SupportsTriggerAvailableNow(ABC): + """ + A mixin interface for streaming sources that support Trigger.AvailableNow. This interface can + be added to both :class:`DataSourceStreamReader` and :class:`SimpleDataSourceStreamReader`. + """ + + @abstractmethod + def prepareForTriggerAvailableNow(self) -> None: + """ + This will be called at the beginning of streaming queries with Trigger.AvailableNow, to let + the source record the offset for the current latest data at the time (a.k.a the target + offset for the query). The source must behave as if there is no new data coming in after + the target offset, i.e., the source must not return an offset higher than the target offset + when :meth:`DataSourceStreamReader.latestOffset()` is called. + + The source can extend the semantic of "current latest data" based on its own logic, but the + extended semantic must not violate the expectation that the source will not read any data + which is added later than the time this method has called. + + Note that it is the source's responsibility to ensure that calling + :meth:`DataSourceStreamReader.latestOffset()` or :meth:`SimpleDataSourceStreamReader.read()` + after calling this method will eventually reach the target offset, and finally returns the + same offset as given start parameter, to indicate that there is no more data to read. This + includes the case where the query is restarted and the source is asked to read from the Review Comment: https://github.com/apache/spark/commit/4575b1da9e1d7c39c3769f2c53b213a5e5a5407d So here is the scenario - the query reads from Kafka topic. In the first run, the topic has 3 partitions. During the downtime of the query, users perform repartition of Kafka topic and now the topic has 5 partitions. If there was uncommitted batch, the second run of query will get the `start` offset from uncommitted batch, which had only 3 partitions. In the meanwhile, prepareForTriggerAvailableNow() will identify there are 5 partitions and store the offset for 5 partitions. The source is responsible to read further from 3 partitions and figure out new partitions, and eventually touch the offset stored from prepareForTriggerAvailableNow(). The scenario is actually complicated and I might not be able to describe the case with ease of understanding. If there is proposal for better wording, I appreciate the suggestion! ########## python/pyspark/sql/datasource.py: ########## @@ -714,9 +715,37 @@ def initialOffset(self) -> dict: messageParameters={"feature": "initialOffset"}, ) - def latestOffset(self) -> dict: + def latestOffset(self, start: dict, limit: ReadLimit) -> dict: """ - Returns the most recent offset available. + Returns the most recent offset available given a read limit. The start offset can be used + to figure out how much new data should be read given the limit. + + The `start` will be provided from the return value of :meth:`initialOffset()` for + the very first micro-batch, and the offset continues from the last micro-batch for the + following. The source can return the same offset as start offset if there is no data to + process. + + :class:`ReadLimit` can be used by the source to limit the amount of data returned in this + call. The implementation should implement :meth:`getDefaultReadLimit()` to provide the + proper :class:`ReadLimit` if the source can limit the amount of data returned based on the + source options. + + The engine can still call :meth:`latestOffset()` with :class:`ReadAllAvailable` even if the + source produces the different read limit from :meth:`getDefaultReadLimit()`, to respect the + semantic of trigger. The source must always respect the given readLimit provided by the + engine; e.g. if the readLimit is :class:`ReadAllAvailable`, the source must ignore the read + limit configured through options. Review Comment: It's for two cases 1) Trigger.Once (deprecated), 2) fallback of Trigger.AvailableNow (when any stream does not support Trigger.AvailableNow and Trigger.AvailableNow is requested) - I'm not very sure we would like to document these cases, as once we document it's considered as "contract" rather than implementation detail. ########## python/pyspark/sql/datasource.py: ########## @@ -714,9 +715,37 @@ def initialOffset(self) -> dict: messageParameters={"feature": "initialOffset"}, ) - def latestOffset(self) -> dict: + def latestOffset(self, start: dict, limit: ReadLimit) -> dict: """ - Returns the most recent offset available. + Returns the most recent offset available given a read limit. The start offset can be used + to figure out how much new data should be read given the limit. + + The `start` will be provided from the return value of :meth:`initialOffset()` for + the very first micro-batch, and the offset continues from the last micro-batch for the + following. The source can return the same offset as start offset if there is no data to Review Comment: I meant the parameter. "The source can return the `start` parameter as it is, if there is no data to process" ^ Would it be clearer? ########## python/pyspark/sql/datasource.py: ########## @@ -714,9 +715,37 @@ def initialOffset(self) -> dict: messageParameters={"feature": "initialOffset"}, ) - def latestOffset(self) -> dict: + def latestOffset(self, start: dict, limit: ReadLimit) -> dict: """ - Returns the most recent offset available. + Returns the most recent offset available given a read limit. The start offset can be used + to figure out how much new data should be read given the limit. + + The `start` will be provided from the return value of :meth:`initialOffset()` for + the very first micro-batch, and the offset continues from the last micro-batch for the + following. The source can return the same offset as start offset if there is no data to Review Comment: I'm slightly not in favor of coupling the contract with the engine's behavior, since it could limit ourselves. But the explanation from your comment is unlikely to change, so makes sense to me. Thanks for the suggestion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
