HeartSaVioR commented on code in PR #50600:
URL: https://github.com/apache/spark/pull/50600#discussion_r2048052841


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -49,31 +45,43 @@
     pyarrow_requirement_message,
 )
 
-if have_pandas:
-    import pandas as pd
+from pyspark.sql.tests.pandas.helper.helper_pandas_transform_with_state import 
(
+    SimpleStatefulProcessorWithInitialStateFactory,
+    StatefulProcessorWithInitialStateTimersFactory,
+    StatefulProcessorWithListStateInitialStateFactory,
+    EventTimeStatefulProcessorFactory,
+    ProcTimeStatefulProcessorFactory,
+    SimpleStatefulProcessorFactory,
+    StatefulProcessorChainingOpsFactory,
+    SimpleTTLStatefulProcessorFactory,
+    TTLStatefulProcessorFactory,
+    InvalidSimpleStatefulProcessorFactory,
+    ListStateProcessorFactory,
+    ListStateLargeTTLProcessorFactory,
+    MapStateProcessorFactory,
+    MapStateLargeTTLProcessorFactory,
+    BasicProcessorFactory,
+    BasicProcessorNotNullableFactory,
+    AddFieldsProcessorFactory,
+    RemoveFieldsProcessorFactory,
+    ReorderedFieldsProcessorFactory,
+    UpcastProcessorFactory,
+    MinEventTimeStatefulProcessorFactory,
+)
 
 
-@unittest.skipIf(
-    not have_pandas or not have_pyarrow or os.environ.get("PYTHON_GIL", "?") 
== "0",
-    cast(
-        str,
-        pandas_requirement_message or pyarrow_requirement_message or "Not 
supported in no-GIL mode",
-    ),
-)
-class TransformWithStateInPandasTestsMixin:
+class TransformWithStateTestsMixin:
     @classmethod
-    def conf(cls):
-        cfg = SparkConf()
-        cfg.set("spark.sql.shuffle.partitions", "5")
-        cfg.set(
-            "spark.sql.streaming.stateStore.providerClass",
-            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
-        )
-        
cfg.set("spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch",
 "2")
-        cfg.set("spark.sql.session.timeZone", "UTC")
-        # TODO SPARK-49046 this config is to stop query from FEB sink 
gracefully
-        cfg.set("spark.sql.streaming.noDataMicroBatches.enabled", "false")
-        return cfg
+    @abstractmethod
+    def use_pandas(cls) -> bool:
+        ...
+
+    @classmethod
+    def get_processor(cls, stateful_processor_factory) -> StatefulProcessor:
+        if cls.use_pandas():
+            return stateful_processor_factory.pandas()
+        else:
+            return stateful_processor_factory.row()

Review Comment:
   No, the only difference from existing API is user function, hence I think 
this should be OK as long as we ensure every existing tests to work with new 
API as well.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to