brijrajk opened a new pull request, #56178:
URL: https://github.com/apache/spark/pull/56178
### What changes were proposed in this pull request?
`GroupState.setTimeoutDuration` previously accepted only an integer
milliseconds value. This PR
extends it to also accept a Spark interval string (e.g. `"5 minutes"`, `"1
hour 30 minutes"`,
`"1.5 seconds"`), matching the behaviour of the Scala API's
`GroupStateImpl.setTimeoutDuration(String)` overload.
Changes:
- Added `_parse_timeout_duration(duration: str) -> int` helper in
`python/pyspark/sql/streaming/state.py` that converts a Spark interval
string to milliseconds.
Parsing behaviour mirrors Scala's `IntervalUtils.stringToInterval` and
`IntervalUtils.getDuration`
(31 days/month convention for structured streaming watermarks).
- Updated `setTimeoutDuration` to accept `Union[int, str]` and call the
helper when a string is
passed.
- Added `INVALID_TIMEOUT_DURATION_STRING` error class to
`python/pyspark/errors/error-conditions.json`.
- Added `python/pyspark/sql/tests/streaming/test_state.py` with 27 unit
tests covering: all
supported units, months/years (31-day convention), negative component
offsets, fractional seconds,
leading-dot decimals (`.5 seconds`), explicit `+`/`-` signs, whitespace
between sign and
quantity, the `interval` keyword prefix, compound durations,
case-insensitivity, and various
invalid-input cases.
### Why are the changes needed?
The Scala API supports both `setTimeoutDuration(long durationMs)` and
`setTimeoutDuration(String duration)`. The Python implementation only
supported the integer form,
leaving users unable to use human-readable interval strings as described in
SPARK-40437.
### Does this PR introduce _any_ user-facing change?
Yes. `GroupState.setTimeoutDuration` now also accepts a Spark interval
string such as
`"5 minutes"` or `"1 hour 30 minutes"`. The integer form continues to work
unchanged.
This change is relative to the unreleased master branch.
### How was this patch tested?
27 new pure-Python unit tests in
`python/pyspark/sql/tests/streaming/test_state.py`, covering
both positive cases (all units, compound durations, fractional seconds,
edge-case signs and
whitespace) and negative cases (invalid strings, non-positive durations,
wrong timeout mode).
Tests can be run without a full Spark build:
source .venv/bin/activate
PYTHONPATH=python python3 -m unittest
pyspark.sql.tests.streaming.test_state -v
### Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (Anthropic)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]