HeartSaVioR commented on code in PR #47133:
URL: https://github.com/apache/spark/pull/47133#discussion_r1715274058


##########
python/pyspark/sql/streaming/stateful_processor.py:
##########
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, TYPE_CHECKING, Iterator, Union, cast
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.streaming.value_state_client import ValueStateClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
+
+__all__ = ["StatefulProcessor", "StatefulProcessorHandle"]
+
+
+class ValueState:
+    """
+    Class used for arbitrary stateful operations with the v2 API to capture 
single value state.

Review Comment:
   We should not call transformWithState as v2 API as only few people would 
know what is v2. Please call it by the name.



##########
python/pyspark/sql/streaming/stateful_processor.py:
##########
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, TYPE_CHECKING, Iterator, Union, cast
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.streaming.value_state_client import ValueStateClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
+
+__all__ = ["StatefulProcessor", "StatefulProcessorHandle"]
+
+
+class ValueState:
+    """
+    Class used for arbitrary stateful operations with the v2 API to capture 
single value state.
+
+    .. versionadded:: 4.0.0
+    """
+
+    def __init__(
+        self, value_state_client: ValueStateClient, state_name: str, schema: 
Union[StructType, str]
+    ) -> None:
+        self._value_state_client = value_state_client
+        self._state_name = state_name
+        self.schema = schema
+
+    def exists(self) -> bool:
+        """
+        Whether state exists or not.
+        """
+        return self._value_state_client.exists(self._state_name)
+
+    def get(self) -> Any:

Review Comment:
   Again, we expect Row as state value, not a pandas DataFrame. Please let me 
know if you are proposing pandas DataFrame for better suit for more state types.



##########
python/pyspark/sql/streaming/stateful_processor.py:
##########
@@ -0,0 +1,160 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+from abc import ABC, abstractmethod
+from typing import Any, TYPE_CHECKING, Iterator, Union, cast
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.streaming.value_state_client import ValueStateClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+
+if TYPE_CHECKING:
+    from pyspark.sql.pandas._typing import DataFrameLike as PandasDataFrameLike
+
+__all__ = ["StatefulProcessor", "StatefulProcessorHandle"]
+
+
+class ValueState:
+    """
+    Class used for arbitrary stateful operations with the v2 API to capture 
single value state.
+
+    .. versionadded:: 4.0.0
+    """
+
+    def __init__(
+        self, value_state_client: ValueStateClient, state_name: str, schema: 
Union[StructType, str]
+    ) -> None:
+        self._value_state_client = value_state_client
+        self._state_name = state_name
+        self.schema = schema
+
+    def exists(self) -> bool:
+        """
+        Whether state exists or not.
+        """
+        return self._value_state_client.exists(self._state_name)
+
+    def get(self) -> Any:
+        import pandas as pd
+
+        """
+        Get the state value if it exists.
+        """
+        value = self._value_state_client.get(self._state_name)
+        schema = self.schema
+        if isinstance(schema, str):
+            schema = cast(StructType, _parse_datatype_string(schema))
+        columns = [field.name for field in schema.fields]
+        # Create the DataFrame using the values and schema
+        df = pd.DataFrame([value], columns=columns)
+        return df
+
+    def update(self, new_value: Any) -> None:
+        """
+        Update the value of the state.
+        """
+        self._value_state_client.update(self._state_name, self.schema, 
new_value)
+
+    def clear(self) -> None:
+        """
+        Remove this state.
+        """
+        self._value_state_client.clear(self._state_name)
+
+
+class StatefulProcessorHandle:
+    """
+    Represents the operation handle provided to the stateful processor used in 
the arbitrary state

Review Comment:
   nit: transformWithState



##########
python/pyspark/sql/streaming/stateful_processor_api_client.py:
##########
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from enum import Enum
+import os
+import socket
+from typing import Any, Union, cast, Tuple
+
+from pyspark.serializers import write_int, read_int, UTF8Deserializer
+from pyspark.sql.types import StructType, _parse_datatype_string, Row
+from pyspark.sql.utils import has_numpy
+from pyspark.serializers import CPickleSerializer
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["StatefulProcessorApiClient", "StatefulProcessorHandleState"]
+
+
+class StatefulProcessorHandleState(Enum):
+    CREATED = 1
+    INITIALIZED = 2
+    DATA_PROCESSED = 3
+    CLOSED = 4
+
+
+class StatefulProcessorApiClient:
+    def __init__(self, state_server_port: int, key_schema: StructType) -> None:
+        self.key_schema = key_schema
+        self._client_socket = socket.socket()
+        self._client_socket.connect(("localhost", state_server_port))
+        self.sockfile = self._client_socket.makefile(
+            "rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
+        )
+        self.handle_state = StatefulProcessorHandleState.CREATED
+        self.utf8_deserializer = UTF8Deserializer()
+        self.pickleSer = CPickleSerializer()
+
+    def set_handle_state(self, state: StatefulProcessorHandleState) -> None:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        if state == StatefulProcessorHandleState.CREATED:
+            proto_state = stateMessage.CREATED
+        elif state == StatefulProcessorHandleState.INITIALIZED:
+            proto_state = stateMessage.INITIALIZED
+        elif state == StatefulProcessorHandleState.DATA_PROCESSED:
+            proto_state = stateMessage.DATA_PROCESSED
+        else:
+            proto_state = stateMessage.CLOSED
+        set_handle_state = stateMessage.SetHandleState(state=proto_state)
+        handle_call = 
stateMessage.StatefulProcessorCall(setHandleState=set_handle_state)
+        message = stateMessage.StateRequest(statefulProcessorCall=handle_call)
+
+        self._send_proto_message(message.SerializeToString())
+
+        response_message = self._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            self.handle_state = state
+        else:
+            raise PySparkRuntimeError(f"Error setting handle state: " 
f"{response_message[1]}")

Review Comment:
   I see we just match all errors here to PySparkRuntimeError with error 
message (no classification) - shall we revisit the Scala codebase and ensure we 
give the same error class for the same error?



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from pyspark.errors import PySparkRuntimeError
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.functions import split, col
+from pyspark.sql.types import (
+    StringType,
+    StructType,
+    StructField,
+    Row,
+    IntegerType,
+)
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+if have_pandas:
+    import pandas as pd
+
+
[email protected](
+    not have_pandas or not have_pyarrow,
+    cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", "5")
+        cfg.set(
+            "spark.sql.streaming.stateStore.providerClass",
+            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+        )
+        return cfg
+
+    def _prepare_test_resource1(self, input_path):
+        with open(input_path + "/text-test1.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 46\n")
+            fw.write("1, 146\n")
+            fw.write("1, 346\n")
+
+    def _prepare_test_resource2(self, input_path):
+        with open(input_path + "/text-test2.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 223\n")
+            fw.write("0, 323\n")
+            fw.write("1, 246\n")
+            fw.write("1, 6\n")
+
+    def _build_test_df(self, input_path):
+        df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 
1).load(input_path)
+        df_split = df.withColumn("split_values", split(df["value"], ","))
+        df_split = df_split.select(
+            df_split.split_values.getItem(0).alias("id"),
+            df_split.split_values.getItem(1).alias("temperature"),
+        )
+        df_final = df_split.withColumn("id", 
col("id").cast("string")).withColumn(
+            "temperature", col("temperature").cast("int")
+        )
+        return df_final
+
+    def _test_transform_with_state_in_pandas_basic(
+        self, stateful_processor, check_results, single_batch=False
+    ):
+        input_path = tempfile.mkdtemp()
+        self._prepare_test_resource1(input_path)
+        if not single_batch:
+            self._prepare_test_resource2(input_path)
+
+        df = self._build_test_df(input_path)
+
+        for q in self.spark.streams.active:
+            q.stop()
+        self.assertTrue(df.isStreaming)
+
+        output_schema = StructType(
+            [
+                StructField("id", StringType(), True),
+                StructField("countAsString", StringType(), True),
+            ]
+        )
+
+        q = (
+            df.groupBy("id")
+            .transformWithStateInPandas(
+                statefulProcessor=stateful_processor,
+                outputStructType=output_schema,
+                outputMode="Update",
+                timeMode="None",
+            )
+            .writeStream.queryName("this_query")
+            .foreachBatch(check_results)
+            .outputMode("update")
+            .start()
+        )
+
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        self.assertTrue(q.exception() is None)
+
+    def test_transform_with_state_in_pandas_basic(self):
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="3"),
+                    Row(id="1", countAsString="2"),
+                }
+
+        
self._test_transform_with_state_in_pandas_basic(SimpleStatefulProcessor(), 
check_results)
+
+    def test_transform_with_state_in_pandas_sad_cases(self):

Review Comment:
   nit: shall we be explicit a bit for what is the bad case? method name is 
test name.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -0,0 +1,152 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import random
+import shutil
+import string
+import sys
+import tempfile
+import pandas as pd
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.streaming.state import GroupStateTimeout, GroupState
+from pyspark.sql.types import (
+    LongType,
+    StringType,
+    StructType,
+    StructField,
+    Row,
+)
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+from pyspark.testing.utils import eventually
+
+
[email protected](
+    not have_pandas or not have_pyarrow,
+    cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", "5")
+        cfg.set("spark.sql.streaming.stateStore.providerClass",
+                
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider")
+        return cfg
+
+    def _test_apply_in_pandas_with_state_basic(self, func, check_results):
+        input_path = tempfile.mkdtemp()
+
+        def prepare_test_resource():
+            with open(input_path + "/text-test.txt", "w") as fw:
+                fw.write("hello\n")
+                fw.write("this\n")
+
+        prepare_test_resource()
+
+        df = self.spark.readStream.format("text").load(input_path)
+
+        for q in self.spark.streams.active:
+            q.stop()
+        self.assertTrue(df.isStreaming)
+
+        output_type = StructType(
+            [StructField("key", StringType()), StructField("countAsString", 
StringType())]
+        )
+        state_type = StructType([StructField("c", LongType())])
+
+        q = (
+            df.groupBy(df["value"])
+            .transformWithStateInPandas(stateful_processor = 
SimpleStatefulProcessor(),
+                                        outputStructType=output_type,
+                                        outputMode="Update",
+                                        timeMode="None")
+            .writeStream.queryName("this_query")
+            .foreachBatch(check_results)
+            .outputMode("update")
+            .start()
+        )
+
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()

Review Comment:
   +1 Shall we ensure the query to be stopped instead of relying on other test 
to stop leaking query?



##########
python/pyspark/sql/streaming/value_state_client.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Any, Union, cast, Tuple
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["ValueStateClient"]
+
+
+class ValueStateClient:
+    def __init__(self, stateful_processor_api_client: 
StatefulProcessorApiClient) -> None:
+        self._stateful_processor_api_client = stateful_processor_api_client
+
+    def exists(self, state_name: str) -> bool:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        exists_call = stateMessage.Exists()
+        value_state_call = stateMessage.ValueStateCall(stateName=state_name, 
exists=exists_call)
+        state_variable_request = 
stateMessage.StateVariableRequest(valueStateCall=value_state_call)
+        message = 
stateMessage.StateRequest(stateVariableRequest=state_variable_request)
+
+        
self._stateful_processor_api_client._send_proto_message(message.SerializeToString())
+        response_message = 
self._stateful_processor_api_client._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            return True
+        elif status == 1:
+            # server returns 1 if the state does not exist
+            return False
+        else:
+            raise PySparkRuntimeError(
+                f"Error checking value state exists: " f"{response_message[1]}"
+            )
+
+    def get(self, state_name: str) -> Any:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        get_call = stateMessage.Get()
+        value_state_call = stateMessage.ValueStateCall(stateName=state_name, 
get=get_call)
+        state_variable_request = 
stateMessage.StateVariableRequest(valueStateCall=value_state_call)
+        message = 
stateMessage.StateRequest(stateVariableRequest=state_variable_request)
+
+        
self._stateful_processor_api_client._send_proto_message(message.SerializeToString())
+        response_message = 
self._stateful_processor_api_client._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            return 
self._stateful_processor_api_client._receive_and_deserialize()
+        else:

Review Comment:
   ditto, different errors for "variable wasn't registered" vs "the value does 
not exist".



##########
python/pyspark/sql/streaming/value_state_client.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Any, Union, cast, Tuple
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["ValueStateClient"]
+
+
+class ValueStateClient:
+    def __init__(self, stateful_processor_api_client: 
StatefulProcessorApiClient) -> None:
+        self._stateful_processor_api_client = stateful_processor_api_client
+
+    def exists(self, state_name: str) -> bool:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        exists_call = stateMessage.Exists()
+        value_state_call = stateMessage.ValueStateCall(stateName=state_name, 
exists=exists_call)
+        state_variable_request = 
stateMessage.StateVariableRequest(valueStateCall=value_state_call)
+        message = 
stateMessage.StateRequest(stateVariableRequest=state_variable_request)
+
+        
self._stateful_processor_api_client._send_proto_message(message.SerializeToString())
+        response_message = 
self._stateful_processor_api_client._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            return True
+        elif status == 1:
+            # server returns 1 if the state does not exist
+            return False
+        else:
+            raise PySparkRuntimeError(

Review Comment:
   ditto for this class



##########
python/pyspark/sql/streaming/stateful_processor_api_client.py:
##########
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from enum import Enum
+import os
+import socket
+from typing import Any, Union, cast, Tuple
+
+from pyspark.serializers import write_int, read_int, UTF8Deserializer
+from pyspark.sql.types import StructType, _parse_datatype_string, Row
+from pyspark.sql.utils import has_numpy
+from pyspark.serializers import CPickleSerializer
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["StatefulProcessorApiClient", "StatefulProcessorHandleState"]
+
+
+class StatefulProcessorHandleState(Enum):
+    CREATED = 1
+    INITIALIZED = 2
+    DATA_PROCESSED = 3
+    CLOSED = 4
+
+
+class StatefulProcessorApiClient:
+    def __init__(self, state_server_port: int, key_schema: StructType) -> None:
+        self.key_schema = key_schema
+        self._client_socket = socket.socket()
+        self._client_socket.connect(("localhost", state_server_port))
+        self.sockfile = self._client_socket.makefile(
+            "rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
+        )
+        self.handle_state = StatefulProcessorHandleState.CREATED
+        self.utf8_deserializer = UTF8Deserializer()
+        self.pickleSer = CPickleSerializer()
+
+    def set_handle_state(self, state: StatefulProcessorHandleState) -> None:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        if state == StatefulProcessorHandleState.CREATED:
+            proto_state = stateMessage.CREATED
+        elif state == StatefulProcessorHandleState.INITIALIZED:
+            proto_state = stateMessage.INITIALIZED
+        elif state == StatefulProcessorHandleState.DATA_PROCESSED:
+            proto_state = stateMessage.DATA_PROCESSED
+        else:
+            proto_state = stateMessage.CLOSED
+        set_handle_state = stateMessage.SetHandleState(state=proto_state)
+        handle_call = 
stateMessage.StatefulProcessorCall(setHandleState=set_handle_state)
+        message = stateMessage.StateRequest(statefulProcessorCall=handle_call)
+
+        self._send_proto_message(message.SerializeToString())
+
+        response_message = self._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            self.handle_state = state
+        else:
+            raise PySparkRuntimeError(f"Error setting handle state: " 
f"{response_message[1]}")
+
+    def set_implicit_key(self, key: Tuple) -> None:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        key_bytes = self._serialize_to_bytes(self.key_schema, key)
+        set_implicit_key = stateMessage.SetImplicitKey(key=key_bytes)
+        request = 
stateMessage.ImplicitGroupingKeyRequest(setImplicitKey=set_implicit_key)
+        message = stateMessage.StateRequest(implicitGroupingKeyRequest=request)
+
+        self._send_proto_message(message.SerializeToString())
+        response_message = self._receive_proto_message()
+        status = response_message[0]
+        if status != 0:
+            raise PySparkRuntimeError(f"Error setting implicit key: " 
f"{response_message[1]}")
+
+    def remove_implicit_key(self) -> None:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        print("calling remove_implicit_key on python side")

Review Comment:
   debugging purpose, or intentionally left for future debug context?



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from pyspark.errors import PySparkRuntimeError
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.functions import split, col
+from pyspark.sql.types import (
+    StringType,
+    StructType,
+    StructField,
+    Row,
+    IntegerType,
+)
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+if have_pandas:
+    import pandas as pd
+
+
[email protected](
+    not have_pandas or not have_pyarrow,
+    cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", "5")
+        cfg.set(
+            "spark.sql.streaming.stateStore.providerClass",
+            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+        )
+        return cfg
+
+    def _prepare_test_resource1(self, input_path):
+        with open(input_path + "/text-test1.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 46\n")
+            fw.write("1, 146\n")
+            fw.write("1, 346\n")
+
+    def _prepare_test_resource2(self, input_path):
+        with open(input_path + "/text-test2.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 223\n")
+            fw.write("0, 323\n")
+            fw.write("1, 246\n")
+            fw.write("1, 6\n")
+
+    def _build_test_df(self, input_path):
+        df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 
1).load(input_path)
+        df_split = df.withColumn("split_values", split(df["value"], ","))
+        df_split = df_split.select(
+            df_split.split_values.getItem(0).alias("id"),
+            df_split.split_values.getItem(1).alias("temperature"),
+        )
+        df_final = df_split.withColumn("id", 
col("id").cast("string")).withColumn(
+            "temperature", col("temperature").cast("int")
+        )
+        return df_final
+
+    def _test_transform_with_state_in_pandas_basic(
+        self, stateful_processor, check_results, single_batch=False
+    ):
+        input_path = tempfile.mkdtemp()
+        self._prepare_test_resource1(input_path)
+        if not single_batch:
+            self._prepare_test_resource2(input_path)
+
+        df = self._build_test_df(input_path)
+
+        for q in self.spark.streams.active:
+            q.stop()
+        self.assertTrue(df.isStreaming)
+
+        output_schema = StructType(
+            [
+                StructField("id", StringType(), True),
+                StructField("countAsString", StringType(), True),
+            ]
+        )
+
+        q = (
+            df.groupBy("id")
+            .transformWithStateInPandas(
+                statefulProcessor=stateful_processor,
+                outputStructType=output_schema,
+                outputMode="Update",
+                timeMode="None",
+            )
+            .writeStream.queryName("this_query")
+            .foreachBatch(check_results)
+            .outputMode("update")
+            .start()
+        )
+
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        self.assertTrue(q.exception() is None)
+
+    def test_transform_with_state_in_pandas_basic(self):
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="3"),
+                    Row(id="1", countAsString="2"),
+                }
+
+        
self._test_transform_with_state_in_pandas_basic(SimpleStatefulProcessor(), 
check_results)
+
+    def test_transform_with_state_in_pandas_sad_cases(self):
+        def check_results(batch_df, _):
+            assert set(batch_df.sort("id").collect()) == {
+                Row(id="0", countAsString="0"),
+                Row(id="1", countAsString="0"),
+            }
+
+        self._test_transform_with_state_in_pandas_basic(
+            InvalidSimpleStatefulProcessor(), check_results, True
+        )
+
+    def test_transform_with_state_in_pandas_query_restarts(self):
+        input_path = tempfile.mkdtemp()

Review Comment:
   While we are using three different sub-directories, shall we call this out 
as `root_path` and create a subdirectory `input` explicitly?



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from pyspark.errors import PySparkRuntimeError
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.functions import split, col
+from pyspark.sql.types import (
+    StringType,
+    StructType,
+    StructField,
+    Row,
+    IntegerType,
+)
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+if have_pandas:
+    import pandas as pd
+
+
[email protected](
+    not have_pandas or not have_pyarrow,
+    cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", "5")
+        cfg.set(
+            "spark.sql.streaming.stateStore.providerClass",
+            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+        )
+        return cfg
+
+    def _prepare_test_resource1(self, input_path):
+        with open(input_path + "/text-test1.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 46\n")
+            fw.write("1, 146\n")
+            fw.write("1, 346\n")
+
+    def _prepare_test_resource2(self, input_path):
+        with open(input_path + "/text-test2.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 223\n")
+            fw.write("0, 323\n")
+            fw.write("1, 246\n")
+            fw.write("1, 6\n")
+
+    def _build_test_df(self, input_path):
+        df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 
1).load(input_path)
+        df_split = df.withColumn("split_values", split(df["value"], ","))
+        df_split = df_split.select(
+            df_split.split_values.getItem(0).alias("id"),

Review Comment:
   Would adding cast here instead of having withColumn in L84 work?



##########
python/pyspark/sql/streaming/value_state_client.py:
##########
@@ -0,0 +1,98 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from typing import Any, Union, cast, Tuple
+
+from pyspark.sql.streaming.stateful_processor_api_client import 
StatefulProcessorApiClient
+from pyspark.sql.types import StructType, _parse_datatype_string
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["ValueStateClient"]
+
+
+class ValueStateClient:
+    def __init__(self, stateful_processor_api_client: 
StatefulProcessorApiClient) -> None:
+        self._stateful_processor_api_client = stateful_processor_api_client
+
+    def exists(self, state_name: str) -> bool:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        exists_call = stateMessage.Exists()
+        value_state_call = stateMessage.ValueStateCall(stateName=state_name, 
exists=exists_call)
+        state_variable_request = 
stateMessage.StateVariableRequest(valueStateCall=value_state_call)
+        message = 
stateMessage.StateRequest(stateVariableRequest=state_variable_request)
+
+        
self._stateful_processor_api_client._send_proto_message(message.SerializeToString())
+        response_message = 
self._stateful_processor_api_client._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            return True
+        elif status == 1:

Review Comment:
   As I commented, please distinguish two different cases.



##########
python/pyspark/sql/streaming/stateful_processor_api_client.py:
##########
@@ -0,0 +1,167 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+from enum import Enum
+import os
+import socket
+from typing import Any, Union, cast, Tuple
+
+from pyspark.serializers import write_int, read_int, UTF8Deserializer
+from pyspark.sql.types import StructType, _parse_datatype_string, Row
+from pyspark.sql.utils import has_numpy
+from pyspark.serializers import CPickleSerializer
+from pyspark.errors import PySparkRuntimeError
+
+__all__ = ["StatefulProcessorApiClient", "StatefulProcessorHandleState"]
+
+
+class StatefulProcessorHandleState(Enum):
+    CREATED = 1
+    INITIALIZED = 2
+    DATA_PROCESSED = 3
+    CLOSED = 4
+
+
+class StatefulProcessorApiClient:
+    def __init__(self, state_server_port: int, key_schema: StructType) -> None:
+        self.key_schema = key_schema
+        self._client_socket = socket.socket()
+        self._client_socket.connect(("localhost", state_server_port))
+        self.sockfile = self._client_socket.makefile(
+            "rwb", int(os.environ.get("SPARK_BUFFER_SIZE", 65536))
+        )
+        self.handle_state = StatefulProcessorHandleState.CREATED
+        self.utf8_deserializer = UTF8Deserializer()
+        self.pickleSer = CPickleSerializer()
+
+    def set_handle_state(self, state: StatefulProcessorHandleState) -> None:
+        import pyspark.sql.streaming.StateMessage_pb2 as stateMessage
+
+        if state == StatefulProcessorHandleState.CREATED:
+            proto_state = stateMessage.CREATED
+        elif state == StatefulProcessorHandleState.INITIALIZED:
+            proto_state = stateMessage.INITIALIZED
+        elif state == StatefulProcessorHandleState.DATA_PROCESSED:
+            proto_state = stateMessage.DATA_PROCESSED
+        else:
+            proto_state = stateMessage.CLOSED
+        set_handle_state = stateMessage.SetHandleState(state=proto_state)
+        handle_call = 
stateMessage.StatefulProcessorCall(setHandleState=set_handle_state)
+        message = stateMessage.StateRequest(statefulProcessorCall=handle_call)
+
+        self._send_proto_message(message.SerializeToString())
+
+        response_message = self._receive_proto_message()
+        status = response_message[0]
+        if status == 0:
+            self.handle_state = state
+        else:
+            raise PySparkRuntimeError(f"Error setting handle state: " 
f"{response_message[1]}")

Review Comment:
   Also there are internal requests vs user side requests. For example, I don't 
expect users to call set_implicit_key by themselves (so errors from them are 
internal errors), but expect users to call get_value_state (so error could be 
either user facing and internal). The classification of error class has to be 
different for these cases.



##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -0,0 +1,280 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import tempfile
+from pyspark.sql.streaming import StatefulProcessor, StatefulProcessorHandle
+from pyspark.errors import PySparkRuntimeError
+from typing import Iterator
+
+import unittest
+from typing import cast
+
+from pyspark import SparkConf
+from pyspark.sql.functions import split, col
+from pyspark.sql.types import (
+    StringType,
+    StructType,
+    StructField,
+    Row,
+    IntegerType,
+)
+from pyspark.testing.sqlutils import (
+    ReusedSQLTestCase,
+    have_pandas,
+    have_pyarrow,
+    pandas_requirement_message,
+    pyarrow_requirement_message,
+)
+
+if have_pandas:
+    import pandas as pd
+
+
[email protected](
+    not have_pandas or not have_pyarrow,
+    cast(str, pandas_requirement_message or pyarrow_requirement_message),
+)
+class TransformWithStateInPandasTestsMixin:
+    @classmethod
+    def conf(cls):
+        cfg = SparkConf()
+        cfg.set("spark.sql.shuffle.partitions", "5")
+        cfg.set(
+            "spark.sql.streaming.stateStore.providerClass",
+            
"org.apache.spark.sql.execution.streaming.state.RocksDBStateStoreProvider",
+        )
+        return cfg
+
+    def _prepare_test_resource1(self, input_path):
+        with open(input_path + "/text-test1.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 46\n")
+            fw.write("1, 146\n")
+            fw.write("1, 346\n")
+
+    def _prepare_test_resource2(self, input_path):
+        with open(input_path + "/text-test2.txt", "w") as fw:
+            fw.write("0, 123\n")
+            fw.write("0, 223\n")
+            fw.write("0, 323\n")
+            fw.write("1, 246\n")
+            fw.write("1, 6\n")
+
+    def _build_test_df(self, input_path):
+        df = self.spark.readStream.format("text").option("maxFilesPerTrigger", 
1).load(input_path)
+        df_split = df.withColumn("split_values", split(df["value"], ","))
+        df_split = df_split.select(
+            df_split.split_values.getItem(0).alias("id"),
+            df_split.split_values.getItem(1).alias("temperature"),
+        )
+        df_final = df_split.withColumn("id", 
col("id").cast("string")).withColumn(
+            "temperature", col("temperature").cast("int")
+        )
+        return df_final
+
+    def _test_transform_with_state_in_pandas_basic(
+        self, stateful_processor, check_results, single_batch=False
+    ):
+        input_path = tempfile.mkdtemp()
+        self._prepare_test_resource1(input_path)
+        if not single_batch:
+            self._prepare_test_resource2(input_path)
+
+        df = self._build_test_df(input_path)
+
+        for q in self.spark.streams.active:
+            q.stop()
+        self.assertTrue(df.isStreaming)
+
+        output_schema = StructType(
+            [
+                StructField("id", StringType(), True),
+                StructField("countAsString", StringType(), True),
+            ]
+        )
+
+        q = (
+            df.groupBy("id")
+            .transformWithStateInPandas(
+                statefulProcessor=stateful_processor,
+                outputStructType=output_schema,
+                outputMode="Update",
+                timeMode="None",
+            )
+            .writeStream.queryName("this_query")
+            .foreachBatch(check_results)
+            .outputMode("update")
+            .start()
+        )
+
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        self.assertTrue(q.exception() is None)
+
+    def test_transform_with_state_in_pandas_basic(self):
+        def check_results(batch_df, batch_id):
+            if batch_id == 0:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="2"),
+                    Row(id="1", countAsString="2"),
+                }
+            else:
+                assert set(batch_df.sort("id").collect()) == {
+                    Row(id="0", countAsString="3"),
+                    Row(id="1", countAsString="2"),
+                }
+
+        
self._test_transform_with_state_in_pandas_basic(SimpleStatefulProcessor(), 
check_results)
+
+    def test_transform_with_state_in_pandas_sad_cases(self):
+        def check_results(batch_df, _):
+            assert set(batch_df.sort("id").collect()) == {
+                Row(id="0", countAsString="0"),
+                Row(id="1", countAsString="0"),
+            }
+
+        self._test_transform_with_state_in_pandas_basic(
+            InvalidSimpleStatefulProcessor(), check_results, True
+        )
+
+    def test_transform_with_state_in_pandas_query_restarts(self):
+        input_path = tempfile.mkdtemp()
+
+        self._prepare_test_resource1(input_path)
+
+        df = self._build_test_df(input_path)
+
+        for q in self.spark.streams.active:
+            q.stop()
+        self.assertTrue(df.isStreaming)
+
+        output_schema = StructType(
+            [
+                StructField("id", StringType(), True),
+                StructField("countAsString", StringType(), True),
+            ]
+        )
+
+        base_query = (
+            df.groupBy("id")
+            .transformWithStateInPandas(
+                statefulProcessor=SimpleStatefulProcessor(),
+                outputStructType=output_schema,
+                outputMode="Update",
+                timeMode="None",
+            )
+            .writeStream.queryName("this_query")
+            .format("parquet")
+            .outputMode("append")
+            .option("checkpointLocation", input_path + "/checkpoint")
+            .option("path", input_path + "/output")
+        )
+        q = base_query.start()
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        self.assertTrue(q.exception() is None)
+
+        q.stop()
+
+        self._prepare_test_resource2(input_path)
+
+        q = base_query.start()
+        self.assertEqual(q.name, "this_query")
+        self.assertTrue(q.isActive)
+        q.processAllAvailable()
+        self.assertTrue(q.exception() is None)
+        result_df = self.spark.read.parquet(input_path + "/output")
+        assert set(result_df.sort("id").collect()) == {
+            Row(id="0", countAsString="2"),
+            Row(id="0", countAsString="3"),
+            Row(id="1", countAsString="2"),
+            Row(id="1", countAsString="2"),
+        }
+
+
+class SimpleStatefulProcessor(StatefulProcessor):
+    dict = {0: {"0": 1, "1": 2}, 1: {"0": 4, "1": 3}}
+    batch_id = 0
+
+    def init(self, handle: StatefulProcessorHandle) -> None:
+        state_schema = StructType([StructField("value", IntegerType(), True)])
+        self.num_violations_state = handle.getValueState("numViolations", 
state_schema)
+
+    def handleInputRows(self, key, rows) -> Iterator[pd.DataFrame]:
+        new_violations = 0
+        count = 0
+        key_str = key[0]
+        exists = self.num_violations_state.exists()
+        if exists:
+            existing_violations_pdf = self.num_violations_state.get()
+            existing_violations = existing_violations_pdf.get("value")[0]
+            assert existing_violations == self.dict[0][key_str]
+            self.batch_id = 1
+        else:
+            existing_violations = 0
+        for pdf in rows:
+            pdf_count = pdf.count()
+            count += pdf_count.get("temperature")

Review Comment:
   Same for the API doc example - any reason we count the inputs and count the 
number of violations separately?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to