brijrajk opened a new pull request, #56178:
URL: https://github.com/apache/spark/pull/56178

   ### What changes were proposed in this pull request?
   
   `GroupState.setTimeoutDuration` previously accepted only an integer 
milliseconds value. This PR
   extends it to also accept a Spark interval string (e.g. `"5 minutes"`, `"1 
hour 30 minutes"`,
   `"1.5 seconds"`), matching the behaviour of the Scala API's
   `GroupStateImpl.setTimeoutDuration(String)` overload.
   
   Changes:
   - Added `_parse_timeout_duration(duration: str) -> int` helper in
     `python/pyspark/sql/streaming/state.py` that converts a Spark interval 
string to milliseconds.
     Parsing behaviour mirrors Scala's `IntervalUtils.stringToInterval` and 
`IntervalUtils.getDuration`
     (31 days/month convention for structured streaming watermarks).
   - Updated `setTimeoutDuration` to accept `Union[int, str]` and call the 
helper when a string is
     passed.
   - Added `INVALID_TIMEOUT_DURATION_STRING` error class to
     `python/pyspark/errors/error-conditions.json`.
   - Added `python/pyspark/sql/tests/streaming/test_state.py` with 27 unit 
tests covering: all
     supported units, months/years (31-day convention), negative component 
offsets, fractional seconds,
     leading-dot decimals (`.5 seconds`), explicit `+`/`-` signs, whitespace 
between sign and
     quantity, the `interval` keyword prefix, compound durations, 
case-insensitivity, and various
     invalid-input cases.
   
   ### Why are the changes needed?
   
   The Scala API supports both `setTimeoutDuration(long durationMs)` and
   `setTimeoutDuration(String duration)`. The Python implementation only 
supported the integer form,
   leaving users unable to use human-readable interval strings as described in 
SPARK-40437.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Yes. `GroupState.setTimeoutDuration` now also accepts a Spark interval 
string such as
   `"5 minutes"` or `"1 hour 30 minutes"`. The integer form continues to work 
unchanged.
   This change is relative to the unreleased master branch.
   
   ### How was this patch tested?
   
   27 new pure-Python unit tests in 
`python/pyspark/sql/tests/streaming/test_state.py`, covering
   both positive cases (all units, compound durations, fractional seconds, 
edge-case signs and
   whitespace) and negative cases (invalid strings, non-positive durations, 
wrong timeout mode).
   
   Tests can be run without a full Spark build:
   
       source .venv/bin/activate
       PYTHONPATH=python python3 -m unittest 
pyspark.sql.tests.streaming.test_state -v
   
   ### Was this patch authored or co-authored using generative AI tooling?
   
   Generated-by: Claude (Anthropic)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to