bogao007 commented on code in PR #49560:
URL: https://github.com/apache/spark/pull/49560#discussion_r1953587200
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -640,12 +612,7 @@ def _test_transform_with_state_init_state_in_pandas(
df = self._build_test_df(input_path)
self.assertTrue(df.isStreaming)
- output_schema = StructType(
- [
- StructField("id", StringType(), True),
- StructField("value", StringType(), True),
- ]
- )
+ output_schema = "id string, value string"
Review Comment:
Thanks Jing! We didn't change anything around the output schema for TWS, but
still worth adding the coverage!
##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1034,6 +1038,49 @@ class SparkConnectPlanner(
.logicalPlan
}
+ private def transformTransformWithStateInPandas(
+ pythonUdf: PythonUDF,
+ groupedDs: RelationalGroupedDataset,
+ rel: proto.GroupMap): LogicalPlan = {
+ val twsInfo = rel.getTransformWithStateInfo
+ val outputSchema = parseSchema(twsInfo.getOutputSchema)
+
+ if (rel.hasInitialInput) {
+ val initialGroupingCols =
rel.getInitialGroupingExpressionsList.asScala.toSeq.map(expr =>
+ Column(transformExpression(expr)))
+
+ val initialStateDs = Dataset
+ .ofRows(session, transformRelation(rel.getInitialInput))
+ .groupBy(initialGroupingCols: _*)
+
+ // Explicitly creating UDF on resolved column to avoid ambiguity of
analysis on initial state
+ // columns and the input columns
+ val resolvedPythonUDF = createUserDefinedPythonFunction(rel.getFunc)
+ .builder(groupedDs.df.logicalPlan.output)
+ .asInstanceOf[PythonUDF]
+
+ groupedDs
Review Comment:
Nit: Since besides `initialStateDs`, it's the same for both cases. Can we
just use the if-else clause to set the value of `initialStateDs` and move this
part out of the clause?
```
val initialStateDs = null
if (rel.hasInitialInput) {
initialStateDs = ...
}
groupedDs
.transformWithStateInPandas(
Column(pythonUdf),
outputSchema,
twsInfo.getOutputMode,
twsInfo.getTimeMode,
initialStateDs,
twsInfo.getEventTimeColumnName)
.logicalPlan
```
##########
sql/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -1031,6 +1031,26 @@ message GroupMap {
// (Optional) The schema for the grouped state.
optional DataType state_schema = 10;
+
+ // Below fields are used by TransformWithState and TransformWithStateInPandas
+ // (Optional) TransformWithState related parameters.
+ optional TransformWithStateInfo transform_with_state_info = 11;
+}
+
+// Additional input parameters used for TransformWithState operator.
+message TransformWithStateInfo {
Review Comment:
I think for `ApplyInPandasWithState`, we have a separate message. When I was
implementing FlatMapGroupsWithState, I was suggested to use the existing
`GroupMap` message since they share a lot in common.
For this case though, I think it might makes more sense to have a separate
message for `TransformWithState`. @hvanhovell which way is more recommended
here in Spark Connect?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]