anishshri-db commented on code in PR #50600:
URL: https://github.com/apache/spark/pull/50600#discussion_r2047902178


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -49,31 +45,43 @@
     pyarrow_requirement_message,
 )
 
-if have_pandas:
-    import pandas as pd
+from pyspark.sql.tests.pandas.helper.helper_pandas_transform_with_state import 
(
+    SimpleStatefulProcessorWithInitialStateFactory,
+    StatefulProcessorWithInitialStateTimersFactory,
+    StatefulProcessorWithListStateInitialStateFactory,
+    EventTimeStatefulProcessorFactory,
+    ProcTimeStatefulProcessorFactory,
+    SimpleStatefulProcessorFactory,
+    StatefulProcessorChainingOpsFactory,
+    SimpleTTLStatefulProcessorFactory,
+    TTLStatefulProcessorFactory,
+    InvalidSimpleStatefulProcessorFactory,
+    ListStateProcessorFactory,
+    ListStateLargeTTLProcessorFactory,
+    MapStateProcessorFactory,
+    MapStateLargeTTLProcessorFactory,
+    BasicProcessorFactory,
+    BasicProcessorNotNullableFactory,
+    AddFieldsProcessorFactory,
+    RemoveFieldsProcessorFactory,
+    ReorderedFieldsProcessorFactory,
+    UpcastProcessorFactory,
+    MinEventTimeStatefulProcessorFactory,
+)
 
 
-@unittest.skipIf(
-    not have_pandas or not have_pyarrow or os.environ.get("PYTHON_GIL", "?") 
== "0",
-    cast(
-        str,
-        pandas_requirement_message or pyarrow_requirement_message or "Not 
supported in no-GIL mode",
-    ),
-)
-class TransformWithStateInPandasTestsMixin:
+class TransformWithStateTestsMixin:
     @classmethod
-    def conf(cls):
-        cfg = SparkConf()
-        cfg.set("spark.sql.shuffle.partitions", "5")
-        cfg.set(
-            "spark.sql.streaming.stateStore.providerClass",
-            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
-        )
-        
cfg.set("spark.sql.execution.arrow.transformWithStateInPandas.maxRecordsPerBatch",
 "2")
-        cfg.set("spark.sql.session.timeZone", "UTC")
-        # TODO SPARK-49046 this config is to stop query from FEB sink 
gracefully
-        cfg.set("spark.sql.streaming.noDataMicroBatches.enabled", "false")
-        return cfg
+    @abstractmethod
+    def use_pandas(cls) -> bool:
+        ...
+
+    @classmethod
+    def get_processor(cls, stateful_processor_factory) -> StatefulProcessor:
+        if cls.use_pandas():
+            return stateful_processor_factory.pandas()
+        else:
+            return stateful_processor_factory.row()

Review Comment:
   Any tests needed specifically for the new interface ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to