HeartSaVioR commented on code in PR #49156:
URL: https://github.com/apache/spark/pull/49156#discussion_r1894535573
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -907,6 +920,359 @@ def
test_transform_with_state_in_pandas_batch_query_initial_state(self):
Row(id="1", value=str(146 + 346)),
}
+ # This test covers mapState with TTL, an empty state variable
+ # and additional test against initial state python runner
+ def test_transform_with_map_state_metadata(self):
+ checkpoint_path = tempfile.mktemp()
+
+ def check_results(batch_df, batch_id):
+ if batch_id == 0:
+ assert set(batch_df.sort("id").collect()) == {
+ Row(id="0", countAsString="2"),
+ Row(id="1", countAsString="2"),
+ }
+ else:
+ # check for state metadata source
+ metadata_df =
self.spark.read.format("state-metadata").load(checkpoint_path)
+ assert set(
+ metadata_df.select(
+ "operatorId",
+ "operatorName",
+ "stateStoreName",
+ "numPartitions",
+ "minBatchId",
+ "maxBatchId",
+ ).collect()
+ ) == {
+ Row(
+ operatorId=0,
+ operatorName="transformWithStateInPandasExec",
+ stateStoreName="default",
+ numPartitions=5,
+ minBatchId=0,
+ maxBatchId=0,
+ )
+ }
+ operator_properties_json_obj = json.loads(
+ metadata_df.select("operatorProperties").collect()[0][0]
+ )
+ assert operator_properties_json_obj["timeMode"] ==
"ProcessingTime"
+ assert operator_properties_json_obj["outputMode"] == "Update"
+
+ state_var_list = operator_properties_json_obj["stateVariables"]
+ assert len(state_var_list) == 3
+ for state_var in state_var_list:
+ if state_var["stateName"] == "mapState":
+ assert state_var["stateVariableType"] == "MapState"
+ assert state_var["ttlEnabled"]
+ elif state_var["stateName"] == "listState":
+ assert state_var["stateVariableType"] == "ListState"
+ assert not state_var["ttlEnabled"]
+ else:
+ assert state_var["stateName"] ==
"$procTimers_keyToTimestamp"
+ assert state_var["stateVariableType"] == "TimerState"
+
+ # check for state data source
+ map_state_df = (
+ self.spark.read.format("statestore")
+ .option("path", checkpoint_path)
+ .option("stateVarName", "mapState")
+ .load()
+ )
+ assert set(
Review Comment:
nit: any reason to convert this as set while you sort to make the order
deterministic?
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -907,6 +920,359 @@ def
test_transform_with_state_in_pandas_batch_query_initial_state(self):
Row(id="1", value=str(146 + 346)),
}
+ # This test covers mapState with TTL, an empty state variable
+ # and additional test against initial state python runner
+ def test_transform_with_map_state_metadata(self):
+ checkpoint_path = tempfile.mktemp()
+
+ def check_results(batch_df, batch_id):
+ if batch_id == 0:
+ assert set(batch_df.sort("id").collect()) == {
+ Row(id="0", countAsString="2"),
+ Row(id="1", countAsString="2"),
+ }
+ else:
+ # check for state metadata source
+ metadata_df =
self.spark.read.format("state-metadata").load(checkpoint_path)
+ assert set(
+ metadata_df.select(
+ "operatorId",
+ "operatorName",
+ "stateStoreName",
+ "numPartitions",
+ "minBatchId",
+ "maxBatchId",
+ ).collect()
+ ) == {
+ Row(
+ operatorId=0,
+ operatorName="transformWithStateInPandasExec",
+ stateStoreName="default",
+ numPartitions=5,
+ minBatchId=0,
+ maxBatchId=0,
+ )
+ }
+ operator_properties_json_obj = json.loads(
+ metadata_df.select("operatorProperties").collect()[0][0]
+ )
+ assert operator_properties_json_obj["timeMode"] ==
"ProcessingTime"
+ assert operator_properties_json_obj["outputMode"] == "Update"
+
+ state_var_list = operator_properties_json_obj["stateVariables"]
+ assert len(state_var_list) == 3
+ for state_var in state_var_list:
+ if state_var["stateName"] == "mapState":
+ assert state_var["stateVariableType"] == "MapState"
+ assert state_var["ttlEnabled"]
+ elif state_var["stateName"] == "listState":
+ assert state_var["stateVariableType"] == "ListState"
+ assert not state_var["ttlEnabled"]
+ else:
+ assert state_var["stateName"] ==
"$procTimers_keyToTimestamp"
+ assert state_var["stateVariableType"] == "TimerState"
+
+ # check for state data source
+ map_state_df = (
Review Comment:
Shall we check flatten type here as well, likewise we did for list state
type?
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -907,6 +920,359 @@ def
test_transform_with_state_in_pandas_batch_query_initial_state(self):
Row(id="1", value=str(146 + 346)),
}
+ # This test covers mapState with TTL, an empty state variable
+ # and additional test against initial state python runner
+ def test_transform_with_map_state_metadata(self):
+ checkpoint_path = tempfile.mktemp()
+
+ def check_results(batch_df, batch_id):
+ if batch_id == 0:
+ assert set(batch_df.sort("id").collect()) == {
+ Row(id="0", countAsString="2"),
+ Row(id="1", countAsString="2"),
+ }
+ else:
+ # check for state metadata source
+ metadata_df =
self.spark.read.format("state-metadata").load(checkpoint_path)
+ assert set(
+ metadata_df.select(
+ "operatorId",
+ "operatorName",
+ "stateStoreName",
+ "numPartitions",
+ "minBatchId",
+ "maxBatchId",
+ ).collect()
+ ) == {
+ Row(
+ operatorId=0,
+ operatorName="transformWithStateInPandasExec",
+ stateStoreName="default",
+ numPartitions=5,
+ minBatchId=0,
+ maxBatchId=0,
+ )
+ }
+ operator_properties_json_obj = json.loads(
+ metadata_df.select("operatorProperties").collect()[0][0]
+ )
+ assert operator_properties_json_obj["timeMode"] ==
"ProcessingTime"
+ assert operator_properties_json_obj["outputMode"] == "Update"
+
+ state_var_list = operator_properties_json_obj["stateVariables"]
+ assert len(state_var_list) == 3
+ for state_var in state_var_list:
+ if state_var["stateName"] == "mapState":
+ assert state_var["stateVariableType"] == "MapState"
+ assert state_var["ttlEnabled"]
+ elif state_var["stateName"] == "listState":
+ assert state_var["stateVariableType"] == "ListState"
+ assert not state_var["ttlEnabled"]
+ else:
+ assert state_var["stateName"] ==
"$procTimers_keyToTimestamp"
+ assert state_var["stateVariableType"] == "TimerState"
+
+ # check for state data source
+ map_state_df = (
+ self.spark.read.format("statestore")
+ .option("path", checkpoint_path)
+ .option("stateVarName", "mapState")
+ .load()
+ )
+ assert set(
+ map_state_df.selectExpr(
+ "key.id AS groupingKey",
+ "user_map_key.name AS mapKey",
+ "user_map_value.value.count AS mapValue",
+ )
+ .sort("groupingKey")
+ .collect()
+ ) == {
+ Row(groupingKey="0", mapKey="key2", mapValue=2),
+ Row(groupingKey="1", mapKey="key2", mapValue=2),
+ }
+
+ ttl_df = map_state_df.selectExpr(
+ "user_map_value.ttlExpirationMs AS TTLVal"
+ ).collect()
+ # check if there are two rows containing TTL value in map
state dataframe
+ assert len(ttl_df) == 2
+ # check if two rows are of the same TTL value
+ assert len(set(ttl_df)) == 1
+
+ list_state_df = (
+ self.spark.read.format("statestore")
+ .option("path", checkpoint_path)
+ .option("stateVarName", "listState")
+ .load()
+ )
+ assert list_state_df.isEmpty()
+
+ for q in self.spark.streams.active:
+ q.stop()
+
+ self._test_transform_with_state_in_pandas_basic(
+ MapStateLargeTTLProcessor(),
+ check_results,
+ True,
+ "processingTime",
+ checkpoint_path=checkpoint_path,
+ initial_state=None,
+ )
+
+ # run the same test suite again but with no-op initial state
+ # TWS with initial state is using a different python runner
+ init_data = [("0", 789), ("3", 987)]
+ initial_state = self.spark.createDataFrame(init_data, "id string,
temperature int").groupBy(
+ "id"
+ )
+ self._test_transform_with_state_in_pandas_basic(
+ MapStateLargeTTLProcessor(),
Review Comment:
I was wondering how the result could be same between non-initial state and
initial state. So looks like we do not do anything with initial state in that
processor, right? I'm OK with it, just to double confirm.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]