HeartSaVioR opened a new pull request, #54085:
URL: https://github.com/apache/spark/pull/54085

   ### What changes were proposed in this pull request?
   
   This PR proposes to introduce the support of Admission Control and 
Trigger.AvailableNow in Python data source - streaming reader.
   
   To support Admission control, we propose to change `DataSourceStreamReader` 
interface as following:
   (Created a table to perform side-by-side comparison)
   
   | **Before** | **After** |
   | :---: | :---: |
   | `class DataSourceStreamReader(ABC):` | `class 
DataSourceStreamReader(ABC):` |
   | `def initialOffset(self) -> dict` | `def initialOffset(self) -> dict` |
   | `def latestOffset() -> dict` | `def latestOffset(self, start: dict, limit: 
ReadLimit) -> dict` |
   |  | `# NOTE: Optional to implement, default = ReadAllAvailable()` |
   |  | `def getDefaultReadLimit(self) -> ReadLimit` |
   |  | `# NOTE: Optional to implement, default = None` |
   |  | `def reportLatestOffset(self) -> Optional[dict]` |
   | `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]` 
| `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]` |
   | `@abstractmethod def read(self, partition: InputPartition) -> 
Union[Iterator[Tuple], Iterator["RecordBatch"]]` | `@abstractmethod def 
read(self, partition: InputPartition) -> Union[Iterator[Tuple], 
Iterator["RecordBatch"]]` |
   | `def commit(self, end: dict) -> None` | `def commit(self, end: dict) -> 
None` |
   | `def stop(self) -> None` | `def stop(self) -> None` |
   
   The main change is following:
   
   * The method signature for `latestOffset` is changed. The method is 
mandatory.
   * The method `getDefaultReadLimit` is added, as optional.
   * The method `reportLatestOffset` is added, as optional.
   
   This way, new implementations would support Admission Control by default. We 
ensure the engine can handle the case of the old method signature, via Python’s 
built-in inspect module (similar to Java’s reflection). If the method 
“latestOffset” is implemented without parameters, we fall back to the source 
which does not enable admission control. For all new sources, implementing 
latestOffset with parameters is strongly recommended.
   
   ReadLimit interface and built-in implementations will be available for 
source implementations to leverage. Built-in implementations are as follows: 
`ReadAllAvailable`, `ReadMinRows`, `ReadMaxRows`, `ReadMaxFiles`, 
`ReadMaxBytes`. We won’t support custom implementation of `ReadLimit` interface 
at this point since it requires major efforts and we don’t see a demand, but we 
can plan for it if there is a strong demand.
   
   We do not make any change to `SimpleDataSourceStreamReader` for Admission 
Control, since it is designed for small data fetch and could be considered as 
already limiting the data. We could still add the `ReadLimit` later if we see 
strong demand of limiting the fetch size via the source option.
   
   To support `Trigger.AvailableNow`, we propose to introduce a new interface 
as following:
   
   ```
   class SupportsTriggerAvailableNow(ABC):
     @abstractmethod
     def prepareForTriggerAvailableNow(self) -> None
   ```
   
   The above interface can be “mixed-up” with both `DataSourceStreamReader` and 
`SimpleDataSourceStreamReader`. It won’t work with `DataSourceStreamReader` 
implementations having the old method signature of `latestOffset()`, likewise 
mentioned above.
   
   ### Why are the changes needed?
   
   This is to catch up with supported features in Scala DSv2 API, which we got 
reports from developers that missing features block them to implement some data 
sources.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes, users implementing streaming reader via python data source API will be 
able to add the support of Admission Control and Trigger.AvailableNow, which 
had been major lacks of features.
   
   ### How was this patch tested?
   
   New UTs.
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Co-authored using claude-4.5-sonnet


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to