HeartSaVioR commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1849453476
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -701,6 +722,109 @@ def check_results(batch_df, batch_id):
SimpleStatefulProcessorWithInitialState(), check_results,
initial_state
)
+ def _test_transform_with_state_in_pandas_chaining_ops(
+ self, stateful_processor, check_results, timeMode="None",
grouping_cols=["outputTimestamp"]
+ ):
+ import pyspark.sql.functions as f
+
+ input_path = tempfile.mkdtemp()
+ self._prepare_test_resource1(input_path)
+
+ def prepare_batch1(input_path):
+ with open(input_path + "/text-test3.txt", "w") as fw:
+ fw.write("a, 20\n")
+
+ def prepare_batch2(input_path):
+ with open(input_path + "/text-test4.txt", "w") as fw:
+ fw.write("a, 3\n")
+
+ def prepare_batch3(input_path):
+ with open(input_path + "/text-test1.txt", "w") as fw:
+ fw.write("a, 4\n")
+
+ def prepare_batch4(input_path):
+ with open(input_path + "/text-test2.txt", "w") as fw:
+ fw.write("a, 20\n")
+
+ prepare_batch1(input_path)
+ time.sleep(2)
+ prepare_batch2(input_path)
+ time.sleep(2)
+ prepare_batch3(input_path)
+ time.sleep(2)
+ prepare_batch4(input_path)
+
+ df = self._build_test_df(input_path)
+ df = df.select(
+ "id",
f.from_unixtime(f.col("temperature")).alias("eventTime").cast("timestamp")
+ ).withWatermark("eventTime", "10 seconds")
+
+ for q in self.spark.streams.active:
+ q.stop()
+ self.assertTrue(df.isStreaming)
+
+ output_schema = StructType(
+ [
+ StructField("id", StringType(), True),
+ StructField("outputTimestamp", TimestampType(), True),
+ ]
+ )
+
+ q = (
+ df.groupBy("id")
+ .transformWithStateInPandas(
+ statefulProcessor=stateful_processor,
+ outputStructType=output_schema,
+ outputMode="Update",
+ timeMode=timeMode,
+ eventTimeColumnName="outputTimestamp",
+ )
+ .groupBy(grouping_cols)
+ .count()
+ .writeStream.queryName("chaining_ops_query")
+ .foreachBatch(check_results)
+ .outputMode("update")
Review Comment:
update mode? We don't support multiple stateful operators with update mode.
If the query does not break, we are missing whether the operator is stateful or
not. We should check UnsupportedOperator.
##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala:
##########
@@ -1031,6 +1031,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
// Can't prune the columns on LeafNode
case p @ Project(_, _: LeafNode) => p
+ // Can't prune the columns on UpdateEventTimeWatermarkColumn
+ case p@Project(_, _: UpdateEventTimeWatermarkColumn) => p
Review Comment:
nit: space between p and `@`, and `P`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]