jiteshsoni opened a new pull request, #53001:
URL: https://github.com/apache/spark/pull/53001
## What changes were proposed in this pull request?
This PR adds admission control support to the Python DataSource streaming
API, bringing it to feature parity with Scala's `SupportsAdmissionControl`
interface.
**Note:** JIRA account pending approval. Will update `SPARK-XXXXX` with
actual JIRA number once account is created.
### Problem
Currently, Python streaming data sources cannot control microbatch sizes
because the `DataSourceStreamReader.latestOffset()` method has no parameters to
receive the configured limits. This forces Python sources to either:
1. Process all available data (unpredictable resource usage)
2. Artificially limit offsets (risking data loss)
In contrast, Scala sources can implement `SupportsAdmissionControl` to
properly control batch sizes.
### Solution
This PR extends the Python DataSource API to support admission control by:
1. **Enhanced Python API**: Updated `DataSourceStreamReader.latestOffset()`
to accept optional `start_offset` and `read_limit` parameters
2. **Scala Bridge**: Modified `PythonMicroBatchStream` to implement
`SupportsAdmissionControl`
3. **Serialization**: Added `ReadLimit` serialization in
`PythonStreamingSourceRunner`
4. **Python Worker**: Enhanced `python_streaming_source_runner.py` to
deserialize and pass parameters
5. **Monitoring**: Added optional `reportLatestOffset()` method for
observability
### Key Features
- ✅ Full backward compatibility (all parameters optional)
- ✅ Support for all `ReadLimit` types (maxRows, maxFiles, maxBytes, minRows,
composite)
- ✅ Parity with Scala DataSource capabilities
- ✅ Comprehensive test coverage
- ✅ Documentation and example included
## Why are the changes needed?
This change is critical for production streaming workloads using Python
DataSources:
1. **Predictable Performance**: Control batch sizes for stable resource usage
2. **Rate Limiting**: Respect API rate limits and backpressure requirements
3. **Feature Parity**: Python sources gain same capabilities as built-in
Scala sources (Kafka, Delta)
4. **Better Testing**: Easier to test with controlled batch sizes
5. **Production Ready**: Essential for reliable production deployments
## Does this PR introduce any user-facing changes?
**Yes - API Enhancement (Backward Compatible)**
### New API Signature
```python
class DataSourceStreamReader:
def latestOffset(
self,
start_offset: Optional[dict] = None,
read_limit: Optional[dict] = None
) -> dict:
"""Returns the most recent offset available, optionally capped by a
read limit."""
pass
def reportLatestOffset(self) -> Optional[dict]:
"""Returns the absolute latest offset available (for monitoring)."""
return None
```
### Usage Example
```python
class MyStreamReader(DataSourceStreamReader):
def latestOffset(self, start_offset=None, read_limit=None):
start = start_offset["offset"] if start_offset else 0
available = self.query_latest_available()
# Apply admission control
if read_limit and read_limit.get("type") == "maxRows":
max_rows = read_limit["maxRows"]
end = min(start + max_rows, available)
else:
end = available
return {"offset": end}
# Configure batch size
df = spark.readStream.format("my_source").option("maxRecordsPerBatch",
"100").load()
```
### Backward Compatibility
- All existing Python streaming sources continue to work without modification
- Parameters are optional with default values
- No breaking changes to existing API
## How was this patch tested?
### Unit Tests
- ✅ 9 new unit tests in `test_streaming_datasource_admission_control.py`
- ✅ Tests for all `ReadLimit` dictionary formats
- ✅ Tests for backward compatibility
- ✅ All existing streaming tests pass
### Integration Tests
- ✅ New example: `structured_blockchain_admission_control.py`
- ✅ Demonstrates admission control in action
- ✅ Full Spark build completed successfully (12 min)
- ✅ Python tests passed (9/9)
### Test Environment
- Java: OpenJDK 17
- Python: 3.10.19
- Build: `./build/mvn clean package -DskipTests -Phive`
- Tests: `python/run-tests --testnames
'pyspark.sql.tests.streaming.test_streaming_datasource_admission_control'`
## Files Changed
**Total**: 8 files changed, 842 insertions(+), 30 deletions(-)
### Scala Changes
- `sql/core/.../python/PythonMicroBatchStream.scala` - Implement
`SupportsAdmissionControl`
- `sql/core/.../python/streaming/PythonStreamingSourceRunner.scala` -
Serialize `ReadLimit`
### Python Changes
- `python/pyspark/sql/datasource.py` - Enhanced API signature
- `python/pyspark/sql/streaming/python_streaming_source_runner.py` -
Deserialize parameters
- `python/pyspark/sql/datasource_internal.py` - Internal updates
### Tests & Examples
-
`python/pyspark/sql/tests/streaming/test_streaming_datasource_admission_control.py`
- Unit tests
- `examples/.../structured_blockchain_admission_control.py` - Demonstration
### Documentation
- `python/docs/source/tutorial/sql/python_data_source.rst` - Tutorial updates
## License Declaration
I confirm that this contribution is my original work and I license the work
to the Apache Spark project under the Apache License 2.0.
---
**Note**: JIRA account is pending approval (account request submitted). I
will update the PR title and reference with the actual JIRA number once my
account is approved. In the meantime, this PR is ready for technical review.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]