HeartSaVioR commented on code in PR #38008:
URL: https://github.com/apache/spark/pull/38008#discussion_r980721524
##########
python/pyspark/sql/tests/test_pandas_grouped_map_with_state.py:
##########
@@ -90,6 +107,99 @@ def check_results(batch_df, _):
self.assertTrue(q.isActive)
q.processAllAvailable()
+ def test_apply_in_pandas_with_state_python_worker_random_failure(self):
+ output_path = tempfile.mkdtemp()
+ checkpoint_loc = tempfile.mkdtemp()
+ shutil.rmtree(output_path)
+ shutil.rmtree(checkpoint_loc)
+
+ def run_query():
+ df = self.spark.readStream.format("text") \
+ .option("maxFilesPerTrigger", "1") \
+ .load(self.base_path + "/random_failure/input")
+
+ for q in self.spark.streams.active:
+ q.stop()
+ self.assertTrue(df.isStreaming)
+
+ output_type = StructType(
+ [StructField("value", StringType()), StructField("count",
LongType())]
+ )
+ state_type = StructType([StructField("cnt", LongType())])
+
+ def func(key, pdf_iter, state):
+ assert isinstance(state, GroupState)
+
+ # should be huge enough to not trigger kill in every batches
+ # but should be also reasonable to trigger kill multiple times
across batches
+ if random.randrange(300) == 1:
+ sys.exit(1)
+
+ count = state.getOption
+ if count is None:
+ count = 0
+ else:
+ count = count[0]
+
+ for pdf in pdf_iter:
+ count += len(pdf)
+
+ state.update((count,))
+ yield pd.DataFrame({"value": [key[0]], "count": [count]})
+
+ q = (
+ df.groupBy(df["value"])
+ .applyInPandasWithState(
+ func, output_type, state_type, "Append",
GroupStateTimeout.NoTimeout
+ )
+ .writeStream.queryName("this_query")
+ .format("json")
+ .outputMode("append")
+ .option("path", output_path)
+ .option("checkpointLocation", checkpoint_loc)
+ .start()
+ )
+
+ return q
+
+ q = run_query()
+
+ self.assertEqual(q.name, "this_query")
+ self.assertTrue(q.isActive)
+
+ # expected_output directory is constucted from below query:
+ # spark.read.format("text").load("./input").groupBy("value").count() \
+ #
.repartition(1).sort("value").write.format("json").save("./output")
+ expected = self.spark.read.schema("value string, count
int").format("json") \
+ .load(self.base_path + "/random_failure/expected_output") \
+ .sort("value").collect()
Review Comment:
Thanks, I didn't know that is standard - I thought that's workaround. Good
to know.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]