bogao007 commented on code in PR #49560:
URL: https://github.com/apache/spark/pull/49560#discussion_r1953587200


##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -640,12 +612,7 @@ def _test_transform_with_state_init_state_in_pandas(
         df = self._build_test_df(input_path)
         self.assertTrue(df.isStreaming)
 
-        output_schema = StructType(
-            [
-                StructField("id", StringType(), True),
-                StructField("value", StringType(), True),
-            ]
-        )
+        output_schema = "id string, value string"

Review Comment:
   Thanks Jing! We didn't change anything around the output schema for TWS, but 
still worth adding the coverage!



##########
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala:
##########
@@ -1034,6 +1038,49 @@ class SparkConnectPlanner(
       .logicalPlan
   }
 
+  private def transformTransformWithStateInPandas(
+      pythonUdf: PythonUDF,
+      groupedDs: RelationalGroupedDataset,
+      rel: proto.GroupMap): LogicalPlan = {
+    val twsInfo = rel.getTransformWithStateInfo
+    val outputSchema = parseSchema(twsInfo.getOutputSchema)
+
+    if (rel.hasInitialInput) {
+      val initialGroupingCols = 
rel.getInitialGroupingExpressionsList.asScala.toSeq.map(expr =>
+        Column(transformExpression(expr)))
+
+      val initialStateDs = Dataset
+        .ofRows(session, transformRelation(rel.getInitialInput))
+        .groupBy(initialGroupingCols: _*)
+
+      // Explicitly creating UDF on resolved column to avoid ambiguity of 
analysis on initial state
+      // columns and the input columns
+      val resolvedPythonUDF = createUserDefinedPythonFunction(rel.getFunc)
+        .builder(groupedDs.df.logicalPlan.output)
+        .asInstanceOf[PythonUDF]
+
+      groupedDs

Review Comment:
   Nit: Since besides `initialStateDs`, it's the same for both cases. Can we 
just use the if-else clause to set the value of `initialStateDs` and move this 
part out of the clause?
   
   ```
   val initialStateDs = null
   
   if (rel.hasInitialInput) {
       initialStateDs = ...
   }
   
   groupedDs
           .transformWithStateInPandas(
             Column(pythonUdf),
             outputSchema,
             twsInfo.getOutputMode,
             twsInfo.getTimeMode,
             initialStateDs,
             twsInfo.getEventTimeColumnName)
           .logicalPlan
   ```



##########
sql/connect/common/src/main/protobuf/spark/connect/relations.proto:
##########
@@ -1031,6 +1031,26 @@ message GroupMap {
 
   // (Optional) The schema for the grouped state.
   optional DataType state_schema = 10;
+
+  // Below fields are used by TransformWithState and TransformWithStateInPandas
+  // (Optional) TransformWithState related parameters.
+  optional TransformWithStateInfo transform_with_state_info = 11;
+}
+
+// Additional input parameters used for TransformWithState operator.
+message TransformWithStateInfo {

Review Comment:
   I think for `ApplyInPandasWithState`, we have a separate message. When I was 
implementing FlatMapGroupsWithState, I was suggested to use the existing 
`GroupMap` message since they share a lot in common.
   
   For this case though, I think it might makes more sense to have a separate 
message for `TransformWithState`. @hvanhovell which way is more recommended 
here in Spark Connect?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to