micheal-o commented on code in PR #48124:
URL: https://github.com/apache/spark/pull/48124#discussion_r1762220932
##########
python/pyspark/sql/tests/pandas/test_pandas_transform_with_state.py:
##########
@@ -304,6 +306,93 @@ def check_results(batch_df, batch_id):
finally:
input_dir.cleanup()
+ def _test_transform_with_state_in_pandas_chaining_ops(
+ self, stateful_processor, check_results, timeMode="None"
+ ):
+ import pyspark.sql.functions as f
+
+ input_path = tempfile.mkdtemp()
+ self._prepare_test_resource1(input_path)
+
+ def prepare_batch1(input_path):
+ with open(input_path + "/text-test3.txt", "w") as fw:
+ fw.write("a, 20\n")
+
+ def prepare_batch2(input_path):
+ with open(input_path + "/text-test1.txt", "w") as fw:
+ fw.write("a, 4\n")
+
+ def prepare_batch3(input_path):
+ with open(input_path + "/text-test2.txt", "w") as fw:
+ fw.write("a, 11\n")
+ fw.write("a, 13\n")
+ fw.write("a, 15\n")
+
+ prepare_batch1(input_path)
+ prepare_batch2(input_path)
+ prepare_batch3(input_path)
+
+ df = self._build_test_df(input_path)
+ df = df.select("id",
+
f.from_unixtime(f.col("temperature")).alias("eventTime").cast("timestamp")) \
+ .withWatermark("eventTime", "10 seconds")
+
+ for q in self.spark.streams.active:
+ q.stop()
+ self.assertTrue(df.isStreaming)
+
+ output_schema = StructType(
+ [
+ StructField("id", StringType(), True),
+ StructField("outputTimestamp", TimestampType(), True)
+ ]
+ )
+
+ q = (
+ df.groupBy("id")
+ .transformWithStateInPandas(
+ statefulProcessor=stateful_processor,
+ outputStructType=output_schema,
+ outputMode="Update",
+ timeMode=timeMode,
+ eventTimeColumnName="outputTimestamp"
+ )
+ .groupBy("outputTimestamp")
+ .count()
+ .writeStream.queryName("chaining_ops_query")
+ .foreachBatch(check_results)
+ .outputMode("update")
+ .start()
+ )
+
+ self.assertEqual(q.name, "chaining_ops_query")
+ self.assertTrue(q.isActive)
+ q.processAllAvailable()
+ q.awaitTermination(10)
Review Comment:
nit: why wait for 10?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]