jiteshsoni opened a new pull request, #53085:
URL: https://github.com/apache/spark/pull/53085

   ## What changes were proposed in this pull request?
   
   This PR adds admission control support to Python streaming data sources, 
enabling users to control microbatch sizes through `maxRecordsPerBatch`, 
`maxFilesPerBatch`, and `maxBytesPerBatch` options.
   
   **Changes:**
   - Enhanced `DataSourceStreamReader.latestOffset()` to accept optional 
`start` and `limit` parameters
   - Updated `PythonMicroBatchStream` to validate admission control options 
(throws `IllegalArgumentException` for invalid values)
   - Added Python-JVM communication handlers for admission control parameters
   - Implemented backward compatibility fallback logic in 
`python_streaming_source_runner.py`
   - Added comprehensive documentation and example 
(`structured_blockchain_admission_control.py`)
   - Added test cases validating invalid parameter handling
   
   ## Why are the changes needed?
   
   Python streaming data sources previously could not control microbatch sizes 
because `latestOffset()` had no parameters to receive configured limits. This 
forced Python sources to either process all available data (unpredictable 
resource usage) or artificially limit offsets (risking data loss). Scala 
sources have this capability via `SupportsAdmissionControl`.
   
   ## Does this PR introduce any user-facing change?
   
   Yes. Users can now implement admission control in custom Python streaming 
sources:
   - Simple API users get automatic admission control support
   - Full API users can implement `latestOffset(start, limit)` for fine-grained 
control
   - Fully backward compatible (old implementations continue to work)
   
   ## How was this patch tested?
   
   - Added Scala test cases validating `IllegalArgumentException` for invalid 
admission control values
   - Existing streaming tests pass (verifying backward compatibility)
   - All linting/formatting checks pass (scalastyle, scalafmt, lint-scala, 
lint-python, Black, mypy)
   - Example `structured_blockchain_admission_control.py` demonstrates the 
feature
   
   ## Was this patch authored or co-authored using generative AI tooling?
   
   No.
   
   Closes #[SPARK-54305](https://issues.apache.org/jira/browse/SPARK-54305)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to