Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on PR #48838: URL: https://github.com/apache/spark/pull/48838#issuecomment-2505359630 CI has passed: https://github.com/jingz-db/spark/runs/33636466911 Thanks! Merging to master. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR closed pull request #48838: [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer URL: https://github.com/apache/spark/pull/48838 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on PR #48838: URL: https://github.com/apache/spark/pull/48838#issuecomment-2505246642 I just pushed a commit addressing my own review comments as well as linter failure. These are nits so I think it wouldn't matter. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1861444940 ## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ## @@ -558,14 +559,19 @@ def prepare_batch3(input_path): def test_transform_with_state_in_pandas_event_time(self): def check_results(batch_df, batch_id): if batch_id == 0: -assert set(batch_df.sort("id").collect()) == {Row(id="a", timestamp="20")} -elif batch_id == 1: +# check timer registered in the same batch is expired Review Comment: nit: let's comment on `watermark for late event` and `watermark for eviction` per batch, to help verify the output. e.g. in batch_id == 1, watermark for eviction is 10, but the watermark for late event is 0, hence 4 is accepted. The value of timestamp in expired row will follow the value of `watermark for eviction`, hence also helpful. ## python/pyspark/sql/pandas/serializers.py: ## @@ -1197,7 +1198,11 @@ def generate_data_batches(batches): data_batches = generate_data_batches(_batches) for k, g in groupby(data_batches, key=lambda x: x[0]): -yield (k, g) +yield (TransformWithStateInPandasFuncMode.PROCESS_DATA, k, g) Review Comment: nit: looks like not consistent? Here we use tuple with explicit `()` and below class we don't use `()`. Not a huge deal if linter does not complain, but while we are here (linter is failing)... -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1861339701 ## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ## @@ -858,33 +913,30 @@ def init(self, handle: StatefulProcessorHandle) -> None: self.handle = handle self.max_state = handle.getValueState("max_state", state_schema) -def handleInputRows( -self, key, rows, timer_values, expired_timer_info -) -> Iterator[pd.DataFrame]: -if expired_timer_info.is_valid(): -self.max_state.clear() -self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms()) -str_key = f"{str(key[0])}-expired" -yield pd.DataFrame( -{"id": (str_key,), "timestamp": str(expired_timer_info.get_expiry_time_in_ms())} -) +def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]: +self.max_state.clear() +self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms()) +str_key = f"{str(key[0])}-expired" +yield pd.DataFrame( +{"id": (str_key,), "timestamp": str(expired_timer_info.get_expiry_time_in_ms())} +) -else: -timestamp_list = [] -for pdf in rows: -# int64 will represent timestamp in nanosecond, restore to second -timestamp_list.extend((pdf["eventTime"].astype("int64") // 10**9).tolist()) +def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]: +timestamp_list = [] +for pdf in rows: +# int64 will represent timestamp in nanosecond, restore to second +timestamp_list.extend((pdf["eventTime"].astype("int64") // 10**9).tolist()) -if self.max_state.exists(): -cur_max = int(self.max_state.get()[0]) -else: -cur_max = 0 -max_event_time = str(max(cur_max, max(timestamp_list))) +if self.max_state.exists(): +cur_max = int(self.max_state.get()[0]) +else: +cur_max = 0 +max_event_time = str(max(cur_max, max(timestamp_list))) -self.max_state.update((max_event_time,)) - self.handle.registerTimer(timer_values.get_current_watermark_in_ms()) +self.max_state.update((max_event_time,)) +self.handle.registerTimer(timer_values.get_current_watermark_in_ms() + 1) Review Comment: We should also check if timer can expire in the same batch. So I am keeping event time suite as timer expiring in same batch and register a future timestamp for the processing time suite. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1861302160 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: Thanks for putting out the commit! I cherry-picked your change and this is now looking much cleaner! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1861301227 ## python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py: ## @@ -858,33 +913,30 @@ def init(self, handle: StatefulProcessorHandle) -> None: self.handle = handle self.max_state = handle.getValueState("max_state", state_schema) -def handleInputRows( -self, key, rows, timer_values, expired_timer_info -) -> Iterator[pd.DataFrame]: -if expired_timer_info.is_valid(): -self.max_state.clear() -self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms()) -str_key = f"{str(key[0])}-expired" -yield pd.DataFrame( -{"id": (str_key,), "timestamp": str(expired_timer_info.get_expiry_time_in_ms())} -) +def handleExpiredTimer(self, key, timer_values, expired_timer_info) -> Iterator[pd.DataFrame]: +self.max_state.clear() +self.handle.deleteTimer(expired_timer_info.get_expiry_time_in_ms()) +str_key = f"{str(key[0])}-expired" +yield pd.DataFrame( +{"id": (str_key,), "timestamp": str(expired_timer_info.get_expiry_time_in_ms())} +) -else: -timestamp_list = [] -for pdf in rows: -# int64 will represent timestamp in nanosecond, restore to second -timestamp_list.extend((pdf["eventTime"].astype("int64") // 10**9).tolist()) +def handleInputRows(self, key, rows, timer_values) -> Iterator[pd.DataFrame]: +timestamp_list = [] +for pdf in rows: +# int64 will represent timestamp in nanosecond, restore to second +timestamp_list.extend((pdf["eventTime"].astype("int64") // 10**9).tolist()) -if self.max_state.exists(): -cur_max = int(self.max_state.get()[0]) -else: -cur_max = 0 -max_event_time = str(max(cur_max, max(timestamp_list))) +if self.max_state.exists(): +cur_max = int(self.max_state.get()[0]) +else: +cur_max = 0 +max_event_time = str(max(cur_max, max(timestamp_list))) -self.max_state.update((max_event_time,)) - self.handle.registerTimer(timer_values.get_current_watermark_in_ms()) +self.max_state.update((max_event_time,)) +self.handle.registerTimer(timer_values.get_current_watermark_in_ms() + 1) Review Comment: I modified this to current batch timestamp + 1 for testing with more common use cases as registering with current batch timestamp is not a very common use case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1860396150 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: https://github.com/HeartSaVioR/spark/commit/f8952b213ba7f2cbfbc78ef145552317812e9f9b This is the implementation of my suggestion based on 0c5ab3f. I've confirmed that `pyspark.sql.tests.pandas.test_pandas_transform_with_state` passed with this change - I haven't added new tests you've added later though. I think this is lot much simpler - we just add two markers into input iterator which carries over the mode, and the flow does not change at all. No trick on teeing and chaining iterators, minimum changes on the data structure, etc. How this works? This is just the same with how we use iterator in Spark in Scala codebase; with iterator in Scala, we pull one entry, process it and produce output, and pull another entry. The generator would have each entry for every grouping key, and then the marker for timer, and then the marker for completion. Each entry will call the function which eventually calls the user function, and the user function is expected to return the iterator, but the logic to produce the iterator should be synchronous (no async and no laziness, otherwise I guess it can even fail without my change). So when the marker for timer has been evaluated, function calls for all grouping keys must have been already done. Same for the marker for completion. This is same with Scala implementation. As a side effect, updating the phase is corrected in this commit. ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: If you agree with this, please pick the commit in above. You've already gone through some commits and I can't revert partially by myself. My fork is public, so you can add my repo and fetch and pull the branch, and cherrypick the commit into this PR branch with merge conflict. I'd recommend you to take whole different way - perform "hard reset" to my commit in this PR branch (`git reset --hard f8952b213ba7f2cbfbc78ef145552317812e9f9b`), and add more commits which are used to address other review comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859577741 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. +For a single partition, after finish handling all input rows, we need to iterate +through all expired timers and handle them. We chain the results of handleInputRows +with handleExpiredTimer into a single iterator and dump the stream as arrow batches. """ -result = [(b, t) for x in iterator for y, t in x for b in y] -super().dump_stream(result, stream) + +from itertools import tee, chain +from pyspark.sql.streaming.stateful_processor_api_client import ( +StatefulProcessorHandleState, +) +from pyspark.sql.streaming.stateful_processor import ( +ExpiredTimerInfo, +TimerValues, +) + +# Clone the original iterator to get additional args +cloned_iterator, result_iterator = tee(iterator) +result = [(pd, t) for x in cloned_iterator for y, t in x for pd in y[0]] +args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x] + +# if num of keys is smaller than num of partitions, some partitions will have empty +# input rows; we do nothing for such partitions +if len(args) == 0: +return + +# all keys on the same partition share the same args +statefulProcessorApiClient = args[0][0] +statefulProcessor = args[0][1] +outputType = args[0][2] +timeMode = args[0][3] + +batch_timestamp, watermark_timestamp = statefulProcessorApiClient.get_timestamps() + +result_iter_list = [] +if timeMode.lower() == "processingtime": +expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( +batch_timestamp +) +elif timeMode.lower() == "eventtime": +expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( +watermark_timestamp +) +else: +expiry_list_iter = iter([[]]) + +def timer_iter_wrapper(func, *args, **kwargs): Review Comment: Moved just below `statefulProcessorApiClient` is initialized. We will need to access this object from `timer_iter_wrapper`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859574639 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: Added a `test_transform_with_state_with_timers_single_partition` to test with all timer suites with single partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: TLDR; if we put the timer handling code inside `if key is None`, we will add higher code complexity. We need to make a tradeoff whether adding the complication in either `serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above timer handling codes in `if key is None`. If we put the timer handling logic inside `if key is None`, we will need to call `dump_stream()` again here in `finally `code block: https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. Calling dump_stream() twice means we will need to properly handle how JVM receives batches. Currently we are reusing the `read()` function inside `PythonArrowOutput`, and the reader will end the reading when Python dump_stream signals the end here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118. Since we are now calling `dump_stream()` twice, We will need to overwrite this function in `TransformWithStateInPandasPythonRunner` and continues reading one more time after receiving end. The extra complexity is that we will also need to properly handle the case where some partitions may not have timer iterator and won't start the additional dump stream writer at all and how we are going to handle exceptions if one of the dump_stream failed. Additionally, we need to set the statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so we will need to do some code changes inside `worker.py` to set this properly. I feel like it is better to put all TWS related code changes in one place for better readability. So this means we will need to get the `StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state correctly. This means we will need to have similar code complexity of what we have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output rows iterator are not fully consumed there. It is fully consumed after `dump_stream` is called inside `worker.py`. So either way we will need to deal with extra complexity. I personally think putting timer handling code into `serializer.py` is slightly better because this is more similar to how we are dealing with timer on Scala side - we are chaining the timer output rows after the data handling rows into a single iterator. Let me know if you have suggestions on which way is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: TLDR; if we put the timer handling code inside `if key is None`, we will add higher code complexity. We need to make a tradeoff whether adding the complication in either `serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above timer handling codes in `if key is None`. If we put the timer handling logic inside `if key is None`, we will need to call `dump_stream()` again here in `finally `code block: https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. Calling dump_stream() twice means we will need to properly handle how JVM receives batches. Currently we are reusing the `read()` function inside `PythonArrowOutput`, and the reader will end the reading when Python dump_stream signals the end here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118. Since we are now calling `dump_stream()` twice, We will need to overwrite this function in `TransformWithStateInPandasPythonRunner` and continues reading one more time after receiving end. The extra complexity is that we will also need to properly handle the case where some partitions may not have timer iterator and won't start the additional dump stream writer at all and how we are going to handle exceptions if one of the dump_stream failed. Additionally, we need to set the statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so we will need to do some code changes inside `worker.py` to set this properly. So this means we will need to get the `StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state correctly. This means we will need to have similar code complexity of what we have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output rows iterator are not fully consumed there. It is fully consumed after `dump_stream` is called inside `worker.py`. So either way we will need to deal with extra complexity. I personally think putting timer handling code into `serializer.py` is slightly better because this is more similar to how we are dealing with timer on Scala side - we are chaining the timer output rows after the data handling rows into a single iterator. Let me know if you have suggestions on which way is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: TLDR; if we put the timer handling code inside `if key is None`, we will have higher code complexity. We need to make a tradeoff whether adding the complication in either `serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above timer handling codes in `if key is None`. If we put the timer handling logic inside `if key is None`, we will need to call `dump_stream()` again here in `finally `code block: https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. Calling dump_stream() twice means we will need to properly handle how JVM receives batches. Currently we are reusing the `read()` function inside `PythonArrowOutput`, and the reader will end the reading when the batch signals the end here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118. Since we are now calling `dump_stream()` twice, We will need to overwrite this function in `TransformWithStateInPandasPythonRunner` and continues reading one more time after receiving end. The extra complexity is that we will also need to properly handle the case where some partitions may not have timer iterator and won't start the additional dump stream writer at all and how we are going to handle exceptio ns if one of the dump_stream failed. Additionally, we need to set the statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so we will need to do some code changes inside `worker.py` to set this properly. I feel like it is better to put all TWS related code changes in one place for better readability. So this means we will need to get the `StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state correctly. This means we will need to have similar code complexity of what we have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output rows iterator are not fully consumed there. It is fully consumed after `dump_stream` is called inside `worker.py`. So either way we will need to deal with extra complexity. I personally think putting timer handling code into `serializer.py` is slightly better because this is more similar to how we are dealing with timer on Scala side - we are chaining the timer output rows after the data handling rows into a single iterator. Let me know if you have suggestions on which way is better. ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: TLDR; if we put the timer handling code inside `if key is None`, we will add higher code complexity. We need to make a tradeoff whether adding the complication in either `serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above timer handling codes in `if key is None`. If we put the timer handling logic inside `if key is None`, we will need to call `dump_stream()` again here in `finally `code block: https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. Calling dump_stream() twice means we will need to properly handle how JVM receives batches. Currently we are reusing the `read()` function inside `PythonArrowOutput`, and the reader will end the reading when the batch signals the end here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118. Since we are now calling `dump_stream()` twice, We will need to overwrite this function in `TransformWithStateInPandasPythonRunner` and continues reading one more time after receiving end. The extra complexity is that we will also need to properly handle the case where some partitions may not have timer iterator and won't start the additional dump stream writer at all and how we are going to handle exceptio ns if one of the dump_stream failed. Additionally, we need to set the statefulHandlerSt
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1859367581 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: We need to make a tradeoff whether adding the complication in either `serializer.py` or `TransformWithStateInPandasPythonRunner` if we put the above timer handling codes in `if key is None`. If we put the timer handling logic inside `if key is None`, we will need to call `dump_stream()` again here in `finally `code block: https://github.com/apache/spark/blob/master/python/pyspark/worker.py#L1966. Calling dump_stream() twice means we will need to properly handle how JVM receives batches. Currently we are reusing the `read()` function inside `PythonArrowOutput`, and the reader will end the reading when the batch signals the end here: https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/PythonArrowOutput.scala#L118. Since we are now calling `dump_stream()` twice, We will need to overwrite this function in `TransformWithStateInPandasPythonRunner` and continues reading one more time after receiving end. The extra complexity is that we will also need to properly handle the case where some partitions may not have timer iterator and won't start the additional dump stream writer at all and how we are going to handle exceptio ns if one of the dump_stream failed. Additionally, we need to set the statefulHandlerState to `TIMER_PROCESSED` after all timer rows are processed so we will need to do some code changes inside `worker.py` to set this properly. I feel like it is better to put all TWS related code changes in one place for better readability. So this means we will need to get the `StatefulProcessorHandlerApiClient` object inside `worker.py` to set the state correctly. This means we will need to have similar code complexity of what we have now in `serializer.py` (return one extra StatefulProcessorHandlerApiClient from `transformWithStateWithInitStateUDF`and deserialize it from `out_iter`). We cannot set the `TIMER_PROCESSED` state in `group_ops.py` because the output rows iterator are not fully consumed there. It is fully consumed after `dump_stream` is called inside `worker.py`. So either way we will need to deal with extra complexity. I personally think putting timer handling code into `serializer.py` is slightly better because this is more similar to how we are dealing with timer on Scala side - we are chaining the timer output rows after the data handling rows into a single iterator. Let me know if you have suggestions on which way is better. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857722320 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: I'll wait for the next update about whether my suggestion works or not. I think the complexity would be very different, hence I would like to defer the further review after that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857706752 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: Any reason we can't do this in `if key is None:` in `transformWithStateUDF` and `transformWithStateWithInitStateUDF`? This was my suggestion and I believe you can just do retrieve expired timers and timestamps, and call handleExpiredTimer() with these information, and done. I don't think this complication is necessary - if we can't do this in `key is None` in some reason, I suspect fixing that would be much easier. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857683908 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: +1 to verify this explicitly from test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
bogao007 commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857583249 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: Maybe better to include what the structure looks like for input `iterator` given we have added a bunch of new objects as the UDF output. Either add it here or down below where `args` are being defined. ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: Thanks @jingz-db for the detailed explaination! Do you think if we should add a test case where multiple keys are expired in the same partition? Like we either set partition num to 1 or increase the input to have more keys ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. +For a single partition, after finish handling all input rows, we need to iterate +through all expired timers and handle them. We chain the results of handleInputRows +with handleExpiredTimer into a single iterator and dump the stream as arrow batches. """ -result = [(b, t) for x in iterator for y, t in x for b in y] -super().dump_stream(result, stream) + +from itertools import tee, chain +from pyspark.sql.streaming.stateful_processor_api_client import ( +StatefulProcessorHandleState, +) +from pyspark.sql.streaming.stateful_processor import ( +ExpiredTimerInfo, +TimerValues, +) + +# Clone the original iterator to get additional args +cloned_iterator, result_iterator = tee(iterator) +result = [(pd, t) for x in cloned_iterator for y, t in x for pd in y[0]] +args = [(y[1], y[2], t, y[3]) for x in result_iterator for y, t in x] + +# if num of keys is smaller than num of partitions, some partitions will have empty +# input rows; we do nothing for such partitions +if len(args) == 0: +return + +# all keys on the same partition share the same args +statefulProcessorApiClient = args[0][0] +statefulProcessor = args[0][1] +outputType = args[0][2] +timeMode = args[0][3] + +batch_timestamp, watermark_timestamp = statefulProcessorApiClient.get_timestamps() + +result_iter_list = [] +if timeMode.lower() == "processingtime": +expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( +batch_timestamp +) +elif timeMode.lower() == "eventtime": +expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( +watermark_timestamp +) +else: +expiry_list_iter = iter([[]]) + +def timer_iter_wrapper(func, *args, **kwargs): Review Comment: Nit: can we move this method definition to the top of `dump_stream` to follow the same pattern in this file? This would also make the code easier to read. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857470772 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -502,53 +502,59 @@ def transformWithStateInPandas( if isinstance(outputStructType, str): outputStructType = cast(StructType, _parse_datatype_string(outputStructType)) -def handle_data_with_timers( +def get_timestamps( statefulProcessorApiClient: StatefulProcessorApiClient, -key: Any, -inputRows: Iterator["PandasDataFrameLike"], -) -> Iterator["PandasDataFrameLike"]: -statefulProcessorApiClient.set_implicit_key(key) +) -> Tuple[int, int]: if timeMode != "none": batch_timestamp = statefulProcessorApiClient.get_batch_timestamp() watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp() else: batch_timestamp = -1 watermark_timestamp = -1 -# process with invalid expiry timer info and emit data rows -data_iter = statefulProcessor.handleInputRows( -key, -inputRows, -TimerValues(batch_timestamp, watermark_timestamp), -ExpiredTimerInfo(False), -) - statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED) +return batch_timestamp, watermark_timestamp + +def handle_data_with_timers( +statefulProcessorApiClient: StatefulProcessorApiClient, +key: Any, +batch_timestamp: int, +watermark_timestamp: int, +inputRows: Optional[Iterator["PandasDataFrameLike"]] = None, +) -> Iterator["PandasDataFrameLike"]: +statefulProcessorApiClient.set_implicit_key(key) +# process with data rows +if inputRows is not None: +data_iter = statefulProcessor.handleInputRows( +key, inputRows, TimerValues(batch_timestamp, watermark_timestamp) +) +result_iter_list = [data_iter] +statefulProcessorApiClient.set_handle_state( +StatefulProcessorHandleState.DATA_PROCESSED +) +else: +result_iter_list = [] -if timeMode == "processingtime": +if timeMode.lower() == "processingtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( batch_timestamp ) -elif timeMode == "eventtime": +elif timeMode.lower() == "eventtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( watermark_timestamp ) else: expiry_list_iter = iter([[]]) -result_iter_list = [data_iter] -# process with valid expiry time info and with empty input rows, -# only timer related rows will be emitted +# process with expiry timers, only timer related rows will be emitted Review Comment: Left an explanation of what is causing the correctness issue in my prior implementation here just in case you are curious: https://github.com/apache/spark/pull/48838#discussion_r1857466729 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857469606 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: I now moved the `handleExpiredTimer` inside `serializer.py`, so `get_expiry_timers_iterator()` will called after all `handleInitialState()` are executed for all keys on the partition, and it is also chained after all `handleInputRows()` are called on all keys on the same partition. ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: I now moved the `handleExpiredTimer` inside `serializer.py`, so `get_expiry_timers_iterator()` will be called after all `handleInitialState()` are executed for all keys on the partition, and it is also chained after all `handleInputRows()` are called on all keys on the same partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857466729 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: In my prior implementation, correctness issue happens if there are multiple keys expired on a single partition. E.g. test case `test_transform_with_state_init_state_with_timers` will fail if we set the partition to "1". Previously we call `get_expiry_timers_iterator()` and `handleExpiredTimer()` in the `group_ops.py` inside the UDF which is called per key. So when we register timer for key "0" inside `handleInitialState()` and then we will enter `get_expiry_timers_iterator()`. Because at that time UDF of key "3" is not called yet, timer for key "3" is not registered. We will only see key "0" expires and will only get `Row(id="0-expired")` in the output of first batch. When we enter the UDF for key "3", as in `TransformWithStateInPandasStateServer` [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala#L217) we enforce expiryTimestampIter will only be consumed once per partition, JVM will return none for key "3" as this iterator is already consumed for key "0". This way we have a correctness issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857466729 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: In my prior implementation, correctness issue happens if there are multiple keys expired on a single partition. E.g. test case `test_transform_with_state_init_state_with_timers` will fail if we set the partition to "0". Previously we call `get_expiry_timers_iterator()` and `handleExpiredTimer()` in the `group_ops.py` inside the UDF which is called per key. So when we register timer for key "0" inside `handleInitialState()` and then we will enter `get_expiry_timers_iterator()`. Because at that time UDF of key "3" is not called yet, timer for key "3" is not registered. We will only see key "0" expires and will only get `Row(id="0-expired")` in the output of first batch. When we enter the UDF for key "3", as in `TransformWithStateInPandasStateServer` [here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/python/TransformWithStateInPandasStateServer.scala#L217) we enforce expiryTimestampIter will only be consumed once per partition, JVM will return none for key "3" as this iterator is already consumed for key "0". This way we have a correctness issue. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857459475 ## python/pyspark/sql/pandas/serializers.py: ## @@ -1201,11 +1201,89 @@ def generate_data_batches(batches): def dump_stream(self, iterator, stream): """ -Read through an iterator of (iterator of pandas DataFrame), serialize them to Arrow -RecordBatches, and write batches to stream. +Read through chained return results from a single partition of handleInputRows. Review Comment: @bogao007 Could you revisit this change? This was changed since last time you reviewed because I found a correctness bug in my prior timer change. Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1857426167 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -502,53 +502,59 @@ def transformWithStateInPandas( if isinstance(outputStructType, str): outputStructType = cast(StructType, _parse_datatype_string(outputStructType)) -def handle_data_with_timers( +def get_timestamps( statefulProcessorApiClient: StatefulProcessorApiClient, -key: Any, -inputRows: Iterator["PandasDataFrameLike"], -) -> Iterator["PandasDataFrameLike"]: -statefulProcessorApiClient.set_implicit_key(key) +) -> Tuple[int, int]: if timeMode != "none": batch_timestamp = statefulProcessorApiClient.get_batch_timestamp() watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp() else: batch_timestamp = -1 watermark_timestamp = -1 -# process with invalid expiry timer info and emit data rows -data_iter = statefulProcessor.handleInputRows( -key, -inputRows, -TimerValues(batch_timestamp, watermark_timestamp), -ExpiredTimerInfo(False), -) - statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED) +return batch_timestamp, watermark_timestamp + +def handle_data_with_timers( +statefulProcessorApiClient: StatefulProcessorApiClient, +key: Any, +batch_timestamp: int, +watermark_timestamp: int, +inputRows: Optional[Iterator["PandasDataFrameLike"]] = None, +) -> Iterator["PandasDataFrameLike"]: +statefulProcessorApiClient.set_implicit_key(key) +# process with data rows +if inputRows is not None: +data_iter = statefulProcessor.handleInputRows( +key, inputRows, TimerValues(batch_timestamp, watermark_timestamp) +) +result_iter_list = [data_iter] +statefulProcessorApiClient.set_handle_state( +StatefulProcessorHandleState.DATA_PROCESSED +) +else: +result_iter_list = [] -if timeMode == "processingtime": +if timeMode.lower() == "processingtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( batch_timestamp ) -elif timeMode == "eventtime": +elif timeMode.lower() == "eventtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( watermark_timestamp ) else: expiry_list_iter = iter([[]]) -result_iter_list = [data_iter] -# process with valid expiry time info and with empty input rows, -# only timer related rows will be emitted +# process with expiry timers, only timer related rows will be emitted Review Comment: Thanks so much for catching this! I made a terrible correctness bug in my prior timer implementation. I now moved all timer handling codes into `serializer.py` where the expired timers are processed per partition. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1849549495 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -573,7 +579,11 @@ def transformWithStateUDF( statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED) return iter([]) -result = handle_data_with_timers(statefulProcessorApiClient, key, inputRows) +batch_timestamp, watermark_timestamp = get_timestamps(statefulProcessorApiClient) Review Comment: Ideally this shouldn't be called at every key. If we split out the handling of timer expiration from the handling of input rows, we would only need to call this at once. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on PR #48838: URL: https://github.com/apache/spark/pull/48838#issuecomment-2487478749 I'll revisit the PR once my comments are addressed (or @jingz-db has reasonable point of not doing this), as my proposal would change the code non-trivially. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
HeartSaVioR commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1849528014 ## python/pyspark/sql/pandas/group_ops.py: ## @@ -502,53 +502,59 @@ def transformWithStateInPandas( if isinstance(outputStructType, str): outputStructType = cast(StructType, _parse_datatype_string(outputStructType)) -def handle_data_with_timers( +def get_timestamps( statefulProcessorApiClient: StatefulProcessorApiClient, -key: Any, -inputRows: Iterator["PandasDataFrameLike"], -) -> Iterator["PandasDataFrameLike"]: -statefulProcessorApiClient.set_implicit_key(key) +) -> Tuple[int, int]: if timeMode != "none": batch_timestamp = statefulProcessorApiClient.get_batch_timestamp() watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp() else: batch_timestamp = -1 watermark_timestamp = -1 -# process with invalid expiry timer info and emit data rows -data_iter = statefulProcessor.handleInputRows( -key, -inputRows, -TimerValues(batch_timestamp, watermark_timestamp), -ExpiredTimerInfo(False), -) - statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.DATA_PROCESSED) +return batch_timestamp, watermark_timestamp + +def handle_data_with_timers( +statefulProcessorApiClient: StatefulProcessorApiClient, +key: Any, +batch_timestamp: int, +watermark_timestamp: int, +inputRows: Optional[Iterator["PandasDataFrameLike"]] = None, +) -> Iterator["PandasDataFrameLike"]: +statefulProcessorApiClient.set_implicit_key(key) +# process with data rows +if inputRows is not None: +data_iter = statefulProcessor.handleInputRows( +key, inputRows, TimerValues(batch_timestamp, watermark_timestamp) +) +result_iter_list = [data_iter] +statefulProcessorApiClient.set_handle_state( +StatefulProcessorHandleState.DATA_PROCESSED +) +else: +result_iter_list = [] -if timeMode == "processingtime": +if timeMode.lower() == "processingtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( batch_timestamp ) -elif timeMode == "eventtime": +elif timeMode.lower() == "eventtime": expiry_list_iter = statefulProcessorApiClient.get_expiry_timers_iterator( watermark_timestamp ) else: expiry_list_iter = iter([[]]) -result_iter_list = [data_iter] -# process with valid expiry time info and with empty input rows, -# only timer related rows will be emitted +# process with expiry timers, only timer related rows will be emitted Review Comment: I have confused about this every time. Is this relying on the behavior that expired timer will be removed so we won't list up the same timer as expired multiple times? This is very easy to be forgotten. If there is any way we can just move this out and do this after we process all input? Can this be done in transformWithStateUDF/transformWithStateWithInitStateUDF with key = null? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
jingz-db commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1847130490 ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) + +def handleExpiredTimer( Review Comment: Correct. Add a comment line in the docstring to explicitly saying this is optional to implement. ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) Review Comment: Sorry I confused this with the `handleExpiredTimer`. Changed back to `...` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
Re: [PR] [SPARK-50194][SS][PYTHON] Integration of New Timer API and Initial State API with Timer [spark]
bogao007 commented on code in PR #48838: URL: https://github.com/apache/spark/pull/48838#discussion_r1844724391 ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) Review Comment: Why do we change the `...` placeholder here? ## python/pyspark/sql/streaming/stateful_processor.py: ## @@ -420,10 +411,27 @@ def handleInputRows( timer_values: TimerValues Timer value for the current batch that process the input rows. Users can get the processing or event time timestamp from TimerValues. +""" +return iter([]) + +def handleExpiredTimer( Review Comment: Just double check that this method is not required for users to implement, correct? ## python/pyspark/sql/pandas/group_ops.py: ## @@ -573,7 +568,16 @@ def transformWithStateUDF( statefulProcessorApiClient.set_handle_state(StatefulProcessorHandleState.CLOSED) return iter([]) -result = handle_data_with_timers(statefulProcessorApiClient, key, inputRows) +if timeMode != "none": +batch_timestamp = statefulProcessorApiClient.get_batch_timestamp() +watermark_timestamp = statefulProcessorApiClient.get_watermark_timestamp() +else: +batch_timestamp = -1 +watermark_timestamp = -1 Review Comment: Can we abstract this as a separate method and share in both UDFs to reduce redundant code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org