jiteshsoni opened a new pull request, #53001:
URL: https://github.com/apache/spark/pull/53001

   ## What changes were proposed in this pull request?
   
   This PR adds admission control support to the Python DataSource streaming 
API, bringing it to feature parity with Scala's `SupportsAdmissionControl` 
interface.
   
   **Note:** JIRA account pending approval. Will update `SPARK-XXXXX` with 
actual JIRA number once account is created.
   
   ### Problem
   Currently, Python streaming data sources cannot control microbatch sizes 
because the `DataSourceStreamReader.latestOffset()` method has no parameters to 
receive the configured limits. This forces Python sources to either:
   1. Process all available data (unpredictable resource usage)
   2. Artificially limit offsets (risking data loss)
   
   In contrast, Scala sources can implement `SupportsAdmissionControl` to 
properly control batch sizes.
   
   ### Solution
   This PR extends the Python DataSource API to support admission control by:
   
   1. **Enhanced Python API**: Updated `DataSourceStreamReader.latestOffset()` 
to accept optional `start_offset` and `read_limit` parameters
   2. **Scala Bridge**: Modified `PythonMicroBatchStream` to implement 
`SupportsAdmissionControl`
   3. **Serialization**: Added `ReadLimit` serialization in 
`PythonStreamingSourceRunner`
   4. **Python Worker**: Enhanced `python_streaming_source_runner.py` to 
deserialize and pass parameters
   5. **Monitoring**: Added optional `reportLatestOffset()` method for 
observability
   
   ### Key Features
   - ✅ Full backward compatibility (all parameters optional)
   - ✅ Support for all `ReadLimit` types (maxRows, maxFiles, maxBytes, minRows, 
composite)
   - ✅ Parity with Scala DataSource capabilities
   - ✅ Comprehensive test coverage
   - ✅ Documentation and example included
   
   ## Why are the changes needed?
   
   This change is critical for production streaming workloads using Python 
DataSources:
   
   1. **Predictable Performance**: Control batch sizes for stable resource usage
   2. **Rate Limiting**: Respect API rate limits and backpressure requirements
   3. **Feature Parity**: Python sources gain same capabilities as built-in 
Scala sources (Kafka, Delta)
   4. **Better Testing**: Easier to test with controlled batch sizes
   5. **Production Ready**: Essential for reliable production deployments
   
   ## Does this PR introduce any user-facing changes?
   
   **Yes - API Enhancement (Backward Compatible)**
   
   ### New API Signature
   
   ```python
   class DataSourceStreamReader:
       def latestOffset(
           self, 
           start_offset: Optional[dict] = None, 
           read_limit: Optional[dict] = None
       ) -> dict:
           """Returns the most recent offset available, optionally capped by a 
read limit."""
           pass
       
       def reportLatestOffset(self) -> Optional[dict]:
           """Returns the absolute latest offset available (for monitoring)."""
           return None
   ```
   
   ### Usage Example
   
   ```python
   class MyStreamReader(DataSourceStreamReader):
       def latestOffset(self, start_offset=None, read_limit=None):
           start = start_offset["offset"] if start_offset else 0
           available = self.query_latest_available()
           
           # Apply admission control
           if read_limit and read_limit.get("type") == "maxRows":
               max_rows = read_limit["maxRows"]
               end = min(start + max_rows, available)
           else:
               end = available
               
           return {"offset": end}
   
   # Configure batch size
   df = spark.readStream.format("my_source").option("maxRecordsPerBatch", 
"100").load()
   ```
   
   ### Backward Compatibility
   - All existing Python streaming sources continue to work without modification
   - Parameters are optional with default values
   - No breaking changes to existing API
   
   ## How was this patch tested?
   
   ### Unit Tests
   - ✅ 9 new unit tests in `test_streaming_datasource_admission_control.py`
   - ✅ Tests for all `ReadLimit` dictionary formats
   - ✅ Tests for backward compatibility
   - ✅ All existing streaming tests pass
   
   ### Integration Tests
   - ✅ New example: `structured_blockchain_admission_control.py`
   - ✅ Demonstrates admission control in action
   - ✅ Full Spark build completed successfully (12 min)
   - ✅ Python tests passed (9/9)
   
   ### Test Environment
   - Java: OpenJDK 17
   - Python: 3.10.19
   - Build: `./build/mvn clean package -DskipTests -Phive`
   - Tests: `python/run-tests --testnames 
'pyspark.sql.tests.streaming.test_streaming_datasource_admission_control'`
   
   ## Files Changed
   
   **Total**: 8 files changed, 842 insertions(+), 30 deletions(-)
   
   ### Scala Changes
   - `sql/core/.../python/PythonMicroBatchStream.scala` - Implement 
`SupportsAdmissionControl`
   - `sql/core/.../python/streaming/PythonStreamingSourceRunner.scala` - 
Serialize `ReadLimit`
   
   ### Python Changes
   - `python/pyspark/sql/datasource.py` - Enhanced API signature
   - `python/pyspark/sql/streaming/python_streaming_source_runner.py` - 
Deserialize parameters
   - `python/pyspark/sql/datasource_internal.py` - Internal updates
   
   ### Tests & Examples
   - 
`python/pyspark/sql/tests/streaming/test_streaming_datasource_admission_control.py`
 - Unit tests
   - `examples/.../structured_blockchain_admission_control.py` - Demonstration
   
   ### Documentation
   - `python/docs/source/tutorial/sql/python_data_source.rst` - Tutorial updates
   
   ## License Declaration
   
   I confirm that this contribution is my original work and I license the work 
to the Apache Spark project under the Apache License 2.0.
   
   ---
   
   **Note**: JIRA account is pending approval (account request submitted). I 
will update the PR title and reference with the actual JIRA number once my 
account is approved. In the meantime, this PR is ready for technical review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to