Yicong-Huang commented on code in PR #56192:
URL: https://github.com/apache/spark/pull/56192#discussion_r3391687526


##########
python/benchmarks/bench_eval_type.py:
##########
@@ -1929,3 +1935,173 @@ class 
WindowAggPandasUDFTimeBench(_WindowAggPandasBenchMixin, _TimeBenchBase):
 
 class WindowAggPandasUDFPeakmemBench(_WindowAggPandasBenchMixin, 
_PeakmemBenchBase):
     pass
+
+
+# -- SQL_TRANSFORM_WITH_STATE_PANDAS_UDF 
---------------------------------------
+# Stateful streaming with Pandas. UDF signature is
+# ``(api_client, mode, key, pdfs)`` and returns ``Iterator[pandas.DataFrame]``.
+# The input wire stream is a single plain Arrow stream pre-sorted by the
+# grouping key column at offset 0; ``TransformWithStateInPandasSerializer``
+# chunks rows into one ``(mode, key, pdfs)`` tuple per group, then emits a
+# phantom ``PROCESS_TIMER`` and ``COMPLETE`` call with an empty pdf iterator.
+# ``StatefulProcessorApiClient.__init__`` opens a real TCP socket to the JVM
+# state server; the stub listener below satisfies that connect. The benchmark
+# UDFs never invoke any state API method, so no protocol exchange is needed.
+
+
+class _StubStateServer:
+    """Stub TCP listener so ``StatefulProcessorApiClient`` init succeeds.
+
+    One instance per benchmark process; the port is reused across all scenarios
+    and ASV iterations. The accept loop stashes connections to keep them open
+    until the worker process tears them down (the worker never closes its end
+    explicitly, but Python GCs the socket on ``main`` return).
+    """
+
+    _instance: "_StubStateServer | None" = None
+
+    @classmethod
+    def get_port(cls) -> int:
+        if cls._instance is None:
+            cls._instance = cls()
+        return cls._instance.port
+
+    def __init__(self) -> None:
+        self._sock = socket.socket()
+        self._sock.bind(("127.0.0.1", 0))
+        self._sock.listen(128)
+        self.port = self._sock.getsockname()[1]
+        self._connections: list[socket.socket] = []
+        self._thread = threading.Thread(target=self._accept_loop, daemon=True)
+        self._thread.start()
+
+    def _accept_loop(self) -> None:
+        while True:
+            try:
+                conn, _ = self._sock.accept()
+            except OSError:
+                break
+            self._connections.append(conn)
+
+
+class _TransformWithStatePandasBenchMixin:
+    """Provides ``_write_scenario`` for SQL_TRANSFORM_WITH_STATE_PANDAS_UDF.
+
+    Each scenario emits one plain Arrow stream pre-sorted by the leading int
+    key column. UDFs receive an iterator of value-only Pandas DataFrames per
+    group plus phantom ``PROCESS_TIMER``/``COMPLETE`` calls (empty iterator).
+    """
+
+    # Each scenario: (num_groups, rows_per_group, num_value_cols).
+    # Row counts are scaled so identity_udf (full pdf passthrough -> ~equal
+    # input and output volume) stays under ASV's 60s per-sample timeout.
+    _scenario_configs = {
+        "few_groups_sm": (50, 5_000, 5),
+        "few_groups_lg": (50, 50_000, 5),
+        "many_groups_sm": (2_000, 500, 5),
+        "many_groups_lg": (500, 2_000, 5),
+        "wide_cols": (200, 5_000, 20),
+    }
+
+    @staticmethod
+    def _build_scenario(name):
+        """Build a single TWS Pandas scenario.
+
+        Returns ``(batches, schema)`` where ``batches`` is a plain list of 
Arrow
+        RecordBatches with rows pre-sorted by the leading int32 key column.
+        """
+        np.random.seed(42)
+        num_groups, rows_per_group, num_value_cols = (
+            _TransformWithStatePandasBenchMixin._scenario_configs[name]
+        )
+        total_rows = num_groups * rows_per_group
+        key_array = pa.array(
+            np.repeat(np.arange(num_groups, dtype=np.int32), rows_per_group),
+            type=pa.int32(),
+        )
+        value_pool = MockDataFactory.NUMERIC_TYPES

Review Comment:
   Added two scenarios for this: mixed_cols (string/binary/boolean value 
columns, exercising the non-numeric encode paths) and nested_struct (one struct 
column built from the mixed pool, exercising the struct/dict conversion path). 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to