HeartSaVioR commented on code in PR #49560:
URL: https://github.com/apache/spark/pull/49560#discussion_r1951994759


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -67,6 +67,8 @@ def conf(cls):
         )
         
cfg.set("spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch",
 "2")
         cfg.set("spark.sql.session.timeZone", "UTC")
+        # TODO SPARK-50180 this config is to stop query from FEB sink 
gracefully

Review Comment:
   This ticket refers to the JIRA ticket which is marked as "Duplicated". Shall 
we update this?



##########
python/pyspark/sql/tests/connect/pandas/test_parity_pandas_transform_with_state.py:
##########
@@ -0,0 +1,61 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import unittest
+
+from pyspark.sql.tests.pandas.test_pandas_transform_with_state import (
+    TransformWithStateInPandasTestsMixin,
+)
+from pyspark import SparkConf
+from pyspark.testing.connectutils import ReusedConnectTestCase
+
+
+class TransformWithStateInPandasParityTests(
+    TransformWithStateInPandasTestsMixin, ReusedConnectTestCase
+):
+    """
+    Spark connect parity tests for TransformWithStateInPandas. Run every test 
case in
+     `TransformWithStateInPandasTestsMixin` in spark connect mode.
+    """
+
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf(loadDefaults=False)

Review Comment:
   Is this entire thing just to remove the config "spark.master"? Just wanted 
to understand what we are doing here.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########


Review Comment:
   The most of changes are refactor, do I understand correctly? I just wanted 
to avoid side-by-side comparison for line-by-line.



##########
python/pyspark/sql/pandas/serializers.py:
##########
@@ -1180,6 +1179,7 @@ def load_stream(self, stream):
         this function works in overall.
         """
         import pyarrow as pa
+        from pyspark.sql.streaming.stateful_processor_util import 
TransformWithStateInPandasFuncMode

Review Comment:
   Just curious, any specific reason to descope the import?



##########
python/pyspark/sql/pandas/group_ops.py:
##########
@@ -376,7 +363,7 @@ def transformWithStateInPandas(
         timeMode: str,
         initialState: Optional["GroupedData"] = None,
         eventTimeColumnName: str = "",
-    ) -> DataFrame:
+    ) -> "DataFrame":

Review Comment:
   Also curious, what's the effect of "" on type name?



##########
python/pyspark/sql/streaming/stateful_processor_util.py:
##########
@@ -16,13 +16,220 @@
 #
 
 from enum import Enum
+import itertools
+from typing import Any, Iterator, Optional, TYPE_CHECKING
+from pyspark.sql.streaming.stateful_processor_api_client import (
+    StatefulProcessorApiClient,
+    StatefulProcessorHandleState,
+)
+from pyspark.sql.streaming.stateful_processor import (
+    ExpiredTimerInfo,
+    StatefulProcessor,
+    StatefulProcessorHandle,
+    TimerValues,
+)
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
 
 # This file places the utilities for transformWithStateInPandas; we have a 
separate file to avoid
 # putting internal classes to the stateful_processor.py file which contains 
public APIs.
 
 
 class TransformWithStateInPandasFuncMode(Enum):
+    """
+    Internal mode for python worker UDF mode for transformWithStateInPandas; 
external mode are in
+    `StatefulProcessorHandleState` for public use purposes.
+    """
+
     PROCESS_DATA = 1
     PROCESS_TIMER = 2
     COMPLETE = 3
     PRE_INIT = 4
+
+
+class TransformWithStateInPandasUdfUtils:
+    """
+    Internal Utility class used for python worker UDF for 
transformWithStateInPandas. This class is
+    shared for both classic and spark connect mode.
+    """
+
+    def __init__(self, stateful_processor: StatefulProcessor, time_mode: str):
+        self._stateful_processor = stateful_processor
+        self._time_mode = time_mode
+
+    def transformWithStateUDF(

Review Comment:
   I assume you just copied and pasted, but please highlight if there was also 
a code change other than copying. It's uneasy to know if there is also a 
refactor.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -270,14 +272,10 @@ def check_results(batch_df, _):
     # test list state with ttl has the same behavior as list state when state 
doesn't expire.
     def test_transform_with_state_in_pandas_list_state_large_ttl(self):
         def check_results(batch_df, batch_id):
-            if batch_id == 0:
-                assert set(batch_df.sort("id").collect()) == {
-                    Row(id="0", countAsString="2"),
-                    Row(id="1", countAsString="2"),
-                }
-            else:

Review Comment:
   I'd rather suggest to use Trigger.AvailableNow which will give what you want 
for start() -> processAllAvailable() -> stop(). Though you may still want to 
disable noDataBatch if you want to simplify.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to