HeartSaVioR opened a new pull request, #54085:
URL: https://github.com/apache/spark/pull/54085
### What changes were proposed in this pull request?
This PR proposes to introduce the support of Admission Control and
Trigger.AvailableNow in Python data source - streaming reader.
To support Admission control, we propose to change `DataSourceStreamReader`
interface as following:
(Created a table to perform side-by-side comparison)
| **Before** | **After** |
| :---: | :---: |
| `class DataSourceStreamReader(ABC):` | `class
DataSourceStreamReader(ABC):` |
| `def initialOffset(self) -> dict` | `def initialOffset(self) -> dict` |
| `def latestOffset() -> dict` | `def latestOffset(self, start: dict, limit:
ReadLimit) -> dict` |
| | `# NOTE: Optional to implement, default = ReadAllAvailable()` |
| | `def getDefaultReadLimit(self) -> ReadLimit` |
| | `# NOTE: Optional to implement, default = None` |
| | `def reportLatestOffset(self) -> Optional[dict]` |
| `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]`
| `def partitions(self, start: dict, end: dict) -> Sequence[InputPartition]` |
| `@abstractmethod def read(self, partition: InputPartition) ->
Union[Iterator[Tuple], Iterator["RecordBatch"]]` | `@abstractmethod def
read(self, partition: InputPartition) -> Union[Iterator[Tuple],
Iterator["RecordBatch"]]` |
| `def commit(self, end: dict) -> None` | `def commit(self, end: dict) ->
None` |
| `def stop(self) -> None` | `def stop(self) -> None` |
The main change is following:
* The method signature for `latestOffset` is changed. The method is
mandatory.
* The method `getDefaultReadLimit` is added, as optional.
* The method `reportLatestOffset` is added, as optional.
This way, new implementations would support Admission Control by default. We
ensure the engine can handle the case of the old method signature, via Python’s
built-in inspect module (similar to Java’s reflection). If the method
“latestOffset” is implemented without parameters, we fall back to the source
which does not enable admission control. For all new sources, implementing
latestOffset with parameters is strongly recommended.
ReadLimit interface and built-in implementations will be available for
source implementations to leverage. Built-in implementations are as follows:
`ReadAllAvailable`, `ReadMinRows`, `ReadMaxRows`, `ReadMaxFiles`,
`ReadMaxBytes`. We won’t support custom implementation of `ReadLimit` interface
at this point since it requires major efforts and we don’t see a demand, but we
can plan for it if there is a strong demand.
We do not make any change to `SimpleDataSourceStreamReader` for Admission
Control, since it is designed for small data fetch and could be considered as
already limiting the data. We could still add the `ReadLimit` later if we see
strong demand of limiting the fetch size via the source option.
To support `Trigger.AvailableNow`, we propose to introduce a new interface
as following:
```
class SupportsTriggerAvailableNow(ABC):
@abstractmethod
def prepareForTriggerAvailableNow(self) -> None
```
The above interface can be “mixed-up” with both `DataSourceStreamReader` and
`SimpleDataSourceStreamReader`. It won’t work with `DataSourceStreamReader`
implementations having the old method signature of `latestOffset()`, likewise
mentioned above.
### Why are the changes needed?
This is to catch up with supported features in Scala DSv2 API, which we got
reports from developers that missing features block them to implement some data
sources.
### Does this PR introduce _any_ user-facing change?
Yes, users implementing streaming reader via python data source API will be
able to add the support of Admission Control and Trigger.AvailableNow, which
had been major lacks of features.
### How was this patch tested?
New UTs.
### Was this patch authored or co-authored using generative AI tooling?
Co-authored using claude-4.5-sonnet
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]