Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


HeartSaVioR commented on PR #48838:
URL: https://github.com/apache/spark/pull/48838#issuecomment-2505359630

   CI has passed: https://github.com/jingz-db/spark/runs/33636466911
   
   Thanks! Merging to master.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


HeartSaVioR closed pull request #48838: [SPARK-50194][SS][PYTHON] Integration 
of New Timer API and Initial State API with Timer
URL: https://github.com/apache/spark/pull/48838


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


HeartSaVioR commented on PR #48838:
URL: https://github.com/apache/spark/pull/48838#issuecomment-2505246642

   I just pushed a commit addressing my own review comments as well as linter 
failure. These are nits so I think it wouldn't matter.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1861444940


##
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##
@@ -558,14 +559,19 @@ def prepare_batch3(input_path):
 def test_transform_with_state_in_pandas_event_time(self):
 def check_results(batch_df, batch_id):
 if batch_id == 0:
-assert set(batch_df.sort("id").collect()) == {Row(id="a", 
timestamp="20")}
-elif batch_id == 1:
+# check timer registered in the same batch is expired

Review Comment:
   nit: let's comment on `watermark for late event` and `watermark for 
eviction` per batch, to help verify the output. e.g. in batch_id == 1, 
watermark for eviction is 10, but the watermark for late event is 0, hence 4 is 
accepted. The value of timestamp in expired row will follow the value of 
`watermark for eviction`, hence also helpful.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1197,7 +1198,11 @@ def generate_data_batches(batches):
 data_batches = generate_data_batches(_batches)
 
 for k, g in groupby(data_batches, key=lambda x: x[0]):
-yield (k, g)
+yield (TransformWithStateInPandasFuncMode.PROCESS_DATA, k, g)

Review Comment:
   nit: looks like not consistent? Here we use tuple with explicit `()` and 
below class we don't use `()`. Not a huge deal if linter does not complain, but 
while we are here (linter is failing)...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1861339701


##
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##
@@ -858,33 +913,30 @@ def init(self, handle: StatefulProcessorHandle) -> None:
 self.handle = handle
 self.max_state = handle.getValueState("max_state", state_schema)
 
-def handleInputRows(
-self, key, rows, timer_values, expired_timer_info
-) -> Iterator[pd.DataFrame]:
-if expired_timer_info.is_valid():
-self.max_state.clear()
-self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms())
-str_key = f"{str(key[0])}-expired"
-yield pd.DataFrame(
-{"id": (str_key,), "timestamp": 
str(expired_timer_info.get_expiry_time_in_ms())}
-)
+def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> 
Iterator[pd.DataFrame]:
+self.max_state.clear()
+self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms())
+str_key = f"{str(key[0])}-expired"
+yield pd.DataFrame(
+{"id": (str_key,), "timestamp": 
str(expired_timer_info.get_expiry_time_in_ms())}
+)
 
-else:
-timestamp_list = []
-for pdf in rows:
-# int64 will represent timestamp in nanosecond, restore to 
second
-timestamp_list.extend((pdf["eventTime"].astype("int64") // 
10**9).tolist())
+def handleInputRows(self, key, rows, timer_values) -> 
Iterator[pd.DataFrame]:
+timestamp_list = []
+for pdf in rows:
+# int64 will represent timestamp in nanosecond, restore to second
+timestamp_list.extend((pdf["eventTime"].astype("int64") // 
10**9).tolist())
 
-if self.max_state.exists():
-cur_max = int(self.max_state.get()[0])
-else:
-cur_max = 0
-max_event_time = str(max(cur_max, max(timestamp_list)))
+if self.max_state.exists():
+cur_max = int(self.max_state.get()[0])
+else:
+cur_max = 0
+max_event_time = str(max(cur_max, max(timestamp_list)))
 
-self.max_state.update((max_event_time,))
-
self.handle.registerTimer(timer_values.get_current_watermark_in_ms())
+self.max_state.update((max_event_time,))
+self.handle.registerTimer(timer_values.get_current_watermark_in_ms() + 
1)

Review Comment:
   We should also check if timer can expire in the same batch. So I am keeping 
event time suite as timer expiring in same batch and register a future 
timestamp for the processing time suite.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1861302160


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Thanks for putting out the commit! I cherry-picked your change and this is 
now looking much cleaner!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1861301227


##
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##
@@ -858,33 +913,30 @@ def init(self, handle: StatefulProcessorHandle) -> None:
 self.handle = handle
 self.max_state = handle.getValueState("max_state", state_schema)
 
-def handleInputRows(
-self, key, rows, timer_values, expired_timer_info
-) -> Iterator[pd.DataFrame]:
-if expired_timer_info.is_valid():
-self.max_state.clear()
-self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms())
-str_key = f"{str(key[0])}-expired"
-yield pd.DataFrame(
-{"id": (str_key,), "timestamp": 
str(expired_timer_info.get_expiry_time_in_ms())}
-)
+def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> 
Iterator[pd.DataFrame]:
+self.max_state.clear()
+self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms())
+str_key = f"{str(key[0])}-expired"
+yield pd.DataFrame(
+{"id": (str_key,), "timestamp": 
str(expired_timer_info.get_expiry_time_in_ms())}
+)
 
-else:
-timestamp_list = []
-for pdf in rows:
-# int64 will represent timestamp in nanosecond, restore to 
second
-timestamp_list.extend((pdf["eventTime"].astype("int64") // 
10**9).tolist())
+def handleInputRows(self, key, rows, timer_values) -> 
Iterator[pd.DataFrame]:
+timestamp_list = []
+for pdf in rows:
+# int64 will represent timestamp in nanosecond, restore to second
+timestamp_list.extend((pdf["eventTime"].astype("int64") // 
10**9).tolist())
 
-if self.max_state.exists():
-cur_max = int(self.max_state.get()[0])
-else:
-cur_max = 0
-max_event_time = str(max(cur_max, max(timestamp_list)))
+if self.max_state.exists():
+cur_max = int(self.max_state.get()[0])
+else:
+cur_max = 0
+max_event_time = str(max(cur_max, max(timestamp_list)))
 
-self.max_state.update((max_event_time,))
-
self.handle.registerTimer(timer_values.get_current_watermark_in_ms())
+self.max_state.update((max_event_time,))
+self.handle.registerTimer(timer_values.get_current_watermark_in_ms() + 
1)

Review Comment:
   I modified this to current batch timestamp + 1 for testing with more common 
use cases as registering with current batch timestamp is not a very common use 
case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-27 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1860396150


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   
https://github.com/HeartSaVioR/spark/commit/f8952b213ba7f2cbfbc78ef145552317812e9f9b
   
   This is the implementation of my suggestion based on 0c5ab3f. I've confirmed 
that `pyspark.sql.tests.pandas.test_pandas_transform_with_state` passed with 
this change - I haven't added new tests you've added later though.
   
   I think this is lot much simpler - we just add two markers into input 
iterator which carries over the mode, and the flow does not change at all. No 
trick on teeing and chaining iterators, minimum changes on the data structure, 
etc.
   
   How this works? This is just the same with how we use iterator in Spark in 
Scala codebase; with iterator in Scala, we pull one entry, process it and 
produce output, and pull another entry. The generator would have each entry for 
every grouping key, and then the marker for timer, and then the marker for 
completion. Each entry will call the function which eventually calls the user 
function, and the user function is expected to return the iterator, but the 
logic to produce the iterator should be synchronous (no async and no laziness, 
otherwise I guess it can even fail without my change).
   
   So when the marker for timer has been evaluated, function calls for all 
grouping keys must have been already done. Same for the marker for completion. 
This is same with Scala implementation.
   
   As a side effect, updating the phase is corrected in this commit.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   If you agree with this, please pick the commit in above. You've already gone 
through some commits and I can't revert partially by myself.
   
   My fork is public, so you can add my repo and fetch and pull the branch, and 
cherrypick the commit into this PR branch with merge conflict. I'd recommend 
you to take whole different way - perform "hard reset" to my commit in this PR 
branch (`git reset --hard f8952b213ba7f2cbfbc78ef145552317812e9f9b`), and add 
more commits which are used to address other review comments.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859577741


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.
+For a single partition, after finish handling all input rows, we need 
to iterate
+through all expired timers and handle them. We chain the results of 
handleInputRows
+with handleExpiredTimer into a single iterator and dump the stream as 
arrow batches.
 """
-result = [(b, t) for x in iterator for y, t in x for b in y]
-super().dump_stream(result, stream)
+
+from itertools import tee, chain
+from pyspark.sql.streaming.stateful_processor_api_client import (
+StatefulProcessorHandleState,
+)
+from pyspark.sql.streaming.stateful_processor import (
+ExpiredTimerInfo,
+TimerValues,
+)
+
+# Clone the original iterator to get additional args
+cloned_iterator, result_iterator = tee(iterator)
+result = [(pd, t) for x in cloned_iterator for y, t in x for pd in 
y[0]]
+args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x]
+
+# if num of keys is smaller than num of partitions, some partitions 
will have empty
+# input rows; we do nothing for such partitions
+if len(args) == 0:
+return
+
+# all keys on the same partition share the same args
+statefulProcessorApiClient = args[0][0]
+statefulProcessor = args[0][1]
+outputType = args[0][2]
+timeMode = args[0][3]
+
+batch_timestamp, watermark_timestamp = 
statefulProcessorApiClient.get_timestamps()
+
+result_iter_list = []
+if timeMode.lower() == "processingtime":
+expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+batch_timestamp
+)
+elif timeMode.lower() == "eventtime":
+expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+watermark_timestamp
+)
+else:
+expiry_list_iter = iter([[]])
+
+def timer_iter_wrapper(func, *args, **kwargs):

Review Comment:
   Moved just below `statefulProcessorApiClient` is initialized. We will need 
to access this object from `timer_iter_wrapper`. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859574639


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Added a `test_transform_with_state_with_timers_single_partition` to test 
with all timer suites with single partition.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   TLDR; if we put the timer handling code inside `if key is None`, we will add 
higher code complexity.
   
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when Python 
dump_stream signals the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle
  exceptions if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so 
we will need to do some code changes inside `worker.py` to set this properly. I 
feel like it is better to put all TWS related code changes in one place for 
better readability. So this means we will need to get the 
`StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state 
correctly. This means we will need to have similar code complexity of what we 
have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient 
from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). 
We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output 
rows iterator are not fully consumed there. It is fully consumed after 
`dump_stream` is called inside `worker.py`.
   
   So either way we will need to deal with extra complexity. I personally think 
putting timer handling code into `serializer.py` is slightly better because 
this is more similar to how we are dealing with timer on Scala side - we are 
chaining the timer output rows after the data handling rows into a single 
iterator. 
   
   Let me know if you have suggestions on which way is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   TLDR; if we put the timer handling code inside `if key is None`, we will add 
higher code complexity.
   
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when Python 
dump_stream signals the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle
  exceptions if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so 
we will need to do some code changes inside `worker.py` to set this properly.  
So this means we will need to get the `StatefulProcessorHandlerApiClient` 
object inside `worker.py` to set the state correctly. This means we will need 
to have similar code complexity of what we have now in `serializer.py` (return 
one extra StatefulProcessorHandlerApiClient from 
`transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). We 
cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output 
rows iterator are not fully consumed there. It is fully consumed after 
`dump_stream` is called inside `worker.py`.
   
   So either way we will need to deal with extra complexity. I personally think 
putting timer handling code into `serializer.py` is slightly better because 
this is more similar to how we are dealing with timer on Scala side - we are 
chaining the timer output rows after the data handling rows into a single 
iterator. 
   
   Let me know if you have suggestions on which way is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   TLDR; if we put the timer handling code inside `if key is None`, we will 
have higher code complexity.
   
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when the batch signals 
the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle exceptio
 ns if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so 
we will need to do some code changes inside `worker.py` to set this properly. I 
feel like it is better to put all TWS related code changes in one place for 
better readability. So this means we will need to get the 
`StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state 
correctly. This means we will need to have similar code complexity of what we 
have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient 
from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). 
We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output 
rows iterator are not fully consumed there. It is fully consumed after 
`dump_stream` is called inside `worker.py`.
   
   So either way we will need to deal with extra complexity. I personally think 
putting timer handling code into `serializer.py` is slightly better because 
this is more similar to how we are dealing with timer on Scala side - we are 
chaining the timer output rows after the data handling rows into a single 
iterator. 
   
   Let me know if you have suggestions on which way is better.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   TLDR; if we put the timer handling code inside `if key is None`, we will add 
higher code complexity.
   
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when the batch signals 
the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle exceptio
 ns if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerSt

Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-26 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   We need to make a tradeoff whether adding the complication in either 
`serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above 
timer handling codes in `if key is None`.
   If we put the timer handling logic inside `if key is None`, we will need to 
call `dump_stream()` again here in `finally `code block: 
https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. 
Calling dump_stream() twice means we will need to properly handle how JVM 
receives batches. Currently we are reusing the `read()` function inside 
`PythonArrowOutput`, and the reader will end the reading when the batch signals 
the end here: 
https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118.
 Since we are now calling `dump_stream()` twice, We will need to overwrite this 
function in `TransformWithStateInPandasPythonRunner` and continues reading one 
more time after receiving end. The extra complexity is that we will also need 
to properly handle the case where some partitions may not have timer iterator 
and won't start the additional dump stream writer at all and how we are going 
to handle exceptio
 ns if one of the dump_stream failed. Additionally, we need to set the 
statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so 
we will need to do some code changes inside `worker.py` to set this properly. I 
feel like it is better to put all TWS related code changes in one place for 
better readability. So this means we will need to get the 
`StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state 
correctly. This means we will need to have similar code complexity of what we 
have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient 
from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). 
We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output 
rows iterator are not fully consumed there. It is fully consumed after 
`dump_stream` is called inside `worker.py`.
   
   So either way we will need to deal with extra complexity. I personally think 
putting timer handling code into `serializer.py` is slightly better because 
this is more similar to how we are dealing with timer on Scala side - we are 
chaining the timer output rows after the data handling rows into a single 
iterator. 
   
   Let me know if you have suggestions on which way is better.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857722320


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   I'll wait for the next update about whether my suggestion works or not. I 
think the complexity would be very different, hence I would like to defer the 
further review after that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857706752


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Any reason we can't do this in `if key is None:` in `transformWithStateUDF` 
and `transformWithStateWithInitStateUDF`?
   
   This was my suggestion and I believe you can just do retrieve expired timers 
and timestamps, and call handleExpiredTimer() with these information, and done. 
I don't think this  complication is necessary - if we can't do this in `key is 
None` in some reason, I suspect fixing that would be much easier.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857683908


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   +1 to verify this explicitly from test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


bogao007 commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857583249


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Maybe better to include what the structure looks like for input `iterator` 
given we have added a bunch of new objects as the UDF output. Either add it 
here or down below where `args` are being defined.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   Thanks @jingz-db for the detailed explaination! Do you think if we should 
add a test case where multiple keys are expired in the same partition? Like we 
either set partition num to 1 or increase the input to have more keys



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.
+For a single partition, after finish handling all input rows, we need 
to iterate
+through all expired timers and handle them. We chain the results of 
handleInputRows
+with handleExpiredTimer into a single iterator and dump the stream as 
arrow batches.
 """
-result = [(b, t) for x in iterator for y, t in x for b in y]
-super().dump_stream(result, stream)
+
+from itertools import tee, chain
+from pyspark.sql.streaming.stateful_processor_api_client import (
+StatefulProcessorHandleState,
+)
+from pyspark.sql.streaming.stateful_processor import (
+ExpiredTimerInfo,
+TimerValues,
+)
+
+# Clone the original iterator to get additional args
+cloned_iterator, result_iterator = tee(iterator)
+result = [(pd, t) for x in cloned_iterator for y, t in x for pd in 
y[0]]
+args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x]
+
+# if num of keys is smaller than num of partitions, some partitions 
will have empty
+# input rows; we do nothing for such partitions
+if len(args) == 0:
+return
+
+# all keys on the same partition share the same args
+statefulProcessorApiClient = args[0][0]
+statefulProcessor = args[0][1]
+outputType = args[0][2]
+timeMode = args[0][3]
+
+batch_timestamp, watermark_timestamp = 
statefulProcessorApiClient.get_timestamps()
+
+result_iter_list = []
+if timeMode.lower() == "processingtime":
+expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+batch_timestamp
+)
+elif timeMode.lower() == "eventtime":
+expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
+watermark_timestamp
+)
+else:
+expiry_list_iter = iter([[]])
+
+def timer_iter_wrapper(func, *args, **kwargs):

Review Comment:
   Nit: can we move this method definition to the top of `dump_stream` to 
follow the same pattern in this file? This would also make the code easier to 
read.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857470772


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -502,53 +502,59 @@ def transformWithStateInPandas(
 if isinstance(outputStructType, str):
 outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
 
-def handle_data_with_timers(
+def get_timestamps(
 statefulProcessorApiClient: StatefulProcessorApiClient,
-key: Any,
-inputRows: Iterator["PandasDataFrameLike"],
-) -> Iterator["PandasDataFrameLike"]:
-statefulProcessorApiClient.set_implicit_key(key)
+) -> Tuple[int, int]:
 if timeMode != "none":
 batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
 watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
 else:
 batch_timestamp = -1
 watermark_timestamp = -1
-# process with invalid expiry timer info and emit data rows
-data_iter = statefulProcessor.handleInputRows(
-key,
-inputRows,
-TimerValues(batch_timestamp, watermark_timestamp),
-ExpiredTimerInfo(False),
-)
-
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED)
+return batch_timestamp, watermark_timestamp
+
+def handle_data_with_timers(
+statefulProcessorApiClient: StatefulProcessorApiClient,
+key: Any,
+batch_timestamp: int,
+watermark_timestamp: int,
+inputRows: Optional[Iterator["PandasDataFrameLike"]] = None,
+) -> Iterator["PandasDataFrameLike"]:
+statefulProcessorApiClient.set_implicit_key(key)
+# process with data rows
+if inputRows is not None:
+data_iter = statefulProcessor.handleInputRows(
+key, inputRows, TimerValues(batch_timestamp, 
watermark_timestamp)
+)
+result_iter_list = [data_iter]
+statefulProcessorApiClient.set_handle_state(
+StatefulProcessorHandleState.DATA_PROCESSED
+)
+else:
+result_iter_list = []
 
-if timeMode == "processingtime":
+if timeMode.lower() == "processingtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 batch_timestamp
 )
-elif timeMode == "eventtime":
+elif timeMode.lower() == "eventtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 watermark_timestamp
 )
 else:
 expiry_list_iter = iter([[]])
 
-result_iter_list = [data_iter]
-# process with valid expiry time info and with empty input rows,
-# only timer related rows will be emitted
+# process with expiry timers, only timer related rows will be 
emitted

Review Comment:
   Left an explanation of what is causing the correctness issue in my prior 
implementation here just in case you are curious: 
https://github.com/apache/spark/pull/48838#discussion_r1857466729



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857469606


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   I now moved the `handleExpiredTimer` inside `serializer.py`, so 
`get_expiry_timers_iterator()` will called after all `handleInitialState()` are 
executed for all keys on the partition, and it is also chained after all 
`handleInputRows()` are called on all keys on the same partition.



##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   I now moved the `handleExpiredTimer` inside `serializer.py`, so 
`get_expiry_timers_iterator()` will be called after all `handleInitialState()` 
are executed for all keys on the partition, and it is also chained after all 
`handleInputRows()` are called on all keys on the same partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857466729


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   In my prior implementation, correctness issue happens if there are multiple 
keys expired on a single partition. E.g.  test case 
`test_transform_with_state_init_state_with_timers` will fail if we set the 
partition to "1".
   Previously we call `get_expiry_timers_iterator()` and `handleExpiredTimer()` 
in the `group_ops.py` inside the UDF which is called per key. So when we 
register timer for key "0" inside `handleInitialState()` and then we will enter 
`get_expiry_timers_iterator()`. Because at that time UDF of key "3" is not 
called yet, timer for key "3" is not registered. We will only see key "0" 
expires and will only get `Row(id="0-expired")` in the output of first batch. 
When we enter the UDF for key "3", as in 
`TransformWithStateInPandasStateServer` 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala#L217)
 we enforce expiryTimestampIter will only be consumed once per partition, JVM 
will return none for key "3" as this iterator is already consumed for key "0". 
This way we have a correctness issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857466729


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   In my prior implementation, correctness issue happens if there are multiple 
keys expired on a single partition. E.g.  test case 
`test_transform_with_state_init_state_with_timers` will fail if we set the 
partition to "0".
   Previously we call `get_expiry_timers_iterator()` and `handleExpiredTimer()` 
in the `group_ops.py` inside the UDF which is called per key. So when we 
register timer for key "0" inside `handleInitialState()` and then we will enter 
`get_expiry_timers_iterator()`. Because at that time UDF of key "3" is not 
called yet, timer for key "3" is not registered. We will only see key "0" 
expires and will only get `Row(id="0-expired")` in the output of first batch. 
When we enter the UDF for key "3", as in 
`TransformWithStateInPandasStateServer` 
[here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala#L217)
 we enforce expiryTimestampIter will only be consumed once per partition, JVM 
will return none for key "3" as this iterator is already consumed for key "0". 
This way we have a correctness issue.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857459475


##
python/pyspark/sql/pandas/serializers.py:
##
@@ -1201,11 +1201,89 @@ def generate_data_batches(batches):
 
 def dump_stream(self, iterator, stream):
 """
-Read through an iterator of (iterator of pandas DataFrame), serialize 
them to Arrow
-RecordBatches, and write batches to stream.
+Read through chained return results from a single partition of 
handleInputRows.

Review Comment:
   @bogao007 Could you revisit this change? This was changed since last time 
you reviewed because I found a correctness bug in my prior timer change. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-25 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1857426167


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -502,53 +502,59 @@ def transformWithStateInPandas(
 if isinstance(outputStructType, str):
 outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
 
-def handle_data_with_timers(
+def get_timestamps(
 statefulProcessorApiClient: StatefulProcessorApiClient,
-key: Any,
-inputRows: Iterator["PandasDataFrameLike"],
-) -> Iterator["PandasDataFrameLike"]:
-statefulProcessorApiClient.set_implicit_key(key)
+) -> Tuple[int, int]:
 if timeMode != "none":
 batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
 watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
 else:
 batch_timestamp = -1
 watermark_timestamp = -1
-# process with invalid expiry timer info and emit data rows
-data_iter = statefulProcessor.handleInputRows(
-key,
-inputRows,
-TimerValues(batch_timestamp, watermark_timestamp),
-ExpiredTimerInfo(False),
-)
-
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED)
+return batch_timestamp, watermark_timestamp
+
+def handle_data_with_timers(
+statefulProcessorApiClient: StatefulProcessorApiClient,
+key: Any,
+batch_timestamp: int,
+watermark_timestamp: int,
+inputRows: Optional[Iterator["PandasDataFrameLike"]] = None,
+) -> Iterator["PandasDataFrameLike"]:
+statefulProcessorApiClient.set_implicit_key(key)
+# process with data rows
+if inputRows is not None:
+data_iter = statefulProcessor.handleInputRows(
+key, inputRows, TimerValues(batch_timestamp, 
watermark_timestamp)
+)
+result_iter_list = [data_iter]
+statefulProcessorApiClient.set_handle_state(
+StatefulProcessorHandleState.DATA_PROCESSED
+)
+else:
+result_iter_list = []
 
-if timeMode == "processingtime":
+if timeMode.lower() == "processingtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 batch_timestamp
 )
-elif timeMode == "eventtime":
+elif timeMode.lower() == "eventtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 watermark_timestamp
 )
 else:
 expiry_list_iter = iter([[]])
 
-result_iter_list = [data_iter]
-# process with valid expiry time info and with empty input rows,
-# only timer related rows will be emitted
+# process with expiry timers, only timer related rows will be 
emitted

Review Comment:
   Thanks so much for catching this! I made a terrible correctness bug in my 
prior timer implementation. I now moved all timer handling codes into 
`serializer.py` where the expired timers are processed per partition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-19 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1849549495


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -573,7 +579,11 @@ def transformWithStateUDF(
 
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED)
 return iter([])
 
-result = handle_data_with_timers(statefulProcessorApiClient, key, 
inputRows)
+batch_timestamp, watermark_timestamp = 
get_timestamps(statefulProcessorApiClient)

Review Comment:
   Ideally this shouldn't be called at every key. If we split out the handling 
of timer expiration from the handling of input rows, we would only need to call 
this at once.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-19 Thread via GitHub


HeartSaVioR commented on PR #48838:
URL: https://github.com/apache/spark/pull/48838#issuecomment-2487478749

   I'll revisit the PR once my comments are addressed (or @jingz-db has 
reasonable point of not doing this), as my proposal would change the code 
non-trivially.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-19 Thread via GitHub


HeartSaVioR commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1849528014


##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -502,53 +502,59 @@ def transformWithStateInPandas(
 if isinstance(outputStructType, str):
 outputStructType = cast(StructType, 
_parse_datatype_string(outputStructType))
 
-def handle_data_with_timers(
+def get_timestamps(
 statefulProcessorApiClient: StatefulProcessorApiClient,
-key: Any,
-inputRows: Iterator["PandasDataFrameLike"],
-) -> Iterator["PandasDataFrameLike"]:
-statefulProcessorApiClient.set_implicit_key(key)
+) -> Tuple[int, int]:
 if timeMode != "none":
 batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
 watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
 else:
 batch_timestamp = -1
 watermark_timestamp = -1
-# process with invalid expiry timer info and emit data rows
-data_iter = statefulProcessor.handleInputRows(
-key,
-inputRows,
-TimerValues(batch_timestamp, watermark_timestamp),
-ExpiredTimerInfo(False),
-)
-
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED)
+return batch_timestamp, watermark_timestamp
+
+def handle_data_with_timers(
+statefulProcessorApiClient: StatefulProcessorApiClient,
+key: Any,
+batch_timestamp: int,
+watermark_timestamp: int,
+inputRows: Optional[Iterator["PandasDataFrameLike"]] = None,
+) -> Iterator["PandasDataFrameLike"]:
+statefulProcessorApiClient.set_implicit_key(key)
+# process with data rows
+if inputRows is not None:
+data_iter = statefulProcessor.handleInputRows(
+key, inputRows, TimerValues(batch_timestamp, 
watermark_timestamp)
+)
+result_iter_list = [data_iter]
+statefulProcessorApiClient.set_handle_state(
+StatefulProcessorHandleState.DATA_PROCESSED
+)
+else:
+result_iter_list = []
 
-if timeMode == "processingtime":
+if timeMode.lower() == "processingtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 batch_timestamp
 )
-elif timeMode == "eventtime":
+elif timeMode.lower() == "eventtime":
 expiry_list_iter = 
statefulProcessorApiClient.get_expiry_timers_iterator(
 watermark_timestamp
 )
 else:
 expiry_list_iter = iter([[]])
 
-result_iter_list = [data_iter]
-# process with valid expiry time info and with empty input rows,
-# only timer related rows will be emitted
+# process with expiry timers, only timer related rows will be 
emitted

Review Comment:
   I have confused about this every time. Is this relying on the behavior that 
expired timer will be removed so we won't list up the same timer as expired 
multiple times? This is very easy to be forgotten.
   
   If there is any way we can just move this out and do this after we process 
all input? Can this be done in 
transformWithStateUDF/transformWithStateWithInitStateUDF with key = null?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-18 Thread via GitHub


jingz-db commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1847130490


##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])
+
+def handleExpiredTimer(

Review Comment:
   Correct. Add a comment line in the docstring to explicitly saying this is 
optional to implement.



##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])

Review Comment:
   Sorry I confused this with the `handleExpiredTimer`. Changed back to `...`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]

2024-11-15 Thread via GitHub


bogao007 commented on code in PR #48838:
URL: https://github.com/apache/spark/pull/48838#discussion_r1844724391


##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])

Review Comment:
   Why do we change the `...` placeholder here?



##
python/pyspark/sql/streaming/stateful_processor.py:
##
@@ -420,10 +411,27 @@ def handleInputRows(
 timer_values: TimerValues
   Timer value for the current batch that process the input 
rows.
   Users can get the processing or event time timestamp 
from TimerValues.
+"""
+return iter([])
+
+def handleExpiredTimer(

Review Comment:
   Just double check that this method is not required for users to implement, 
correct?



##
python/pyspark/sql/pandas/group_ops.py:
##
@@ -573,7 +568,16 @@ def transformWithStateUDF(
 
statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED)
 return iter([])
 
-result = handle_data_with_timers(statefulProcessorApiClient, key, 
inputRows)
+if timeMode != "none":
+batch_timestamp = 
statefulProcessorApiClient.get_batch_timestamp()
+watermark_timestamp = 
statefulProcessorApiClient.get_watermark_timestamp()
+else:
+batch_timestamp = -1
+watermark_timestamp = -1

Review Comment:
   Can we abstract this as a separate method and share in both UDFs to reduce 
redundant code?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org