[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38008: [SPARK-40571][SS][TESTS] Construct a new test case for applyInPandasWithState to verify fault-tolerance semantic with random py
HeartSaVioR commented on code in PR #38008: URL: https://github.com/apache/spark/pull/38008#discussion_r980722405 ## python/pyspark/sql/tests/test_pandas_grouped_map_with_state.py: ## @@ -46,8 +55,27 @@ cast(str, pandas_requirement_message or pyarrow_requirement_message), ) class GroupedMapInPandasWithStateTests(ReusedSQLTestCase): +@classmethod +def conf(cls): +cfg = SparkConf() +cfg.set("spark.sql.shuffle.partitions", "5") +return cfg + +def __init__(self, methodName="runTest"): +super(GroupedMapInPandasWithStateTests, self).__init__(methodName) +self.base_path = "python/test_support/sql/streaming/apply_in_pandas_with_state" Review Comment: Nice finding! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38008: [SPARK-40571][SS][TESTS] Construct a new test case for applyInPandasWithState to verify fault-tolerance semantic with random py
HeartSaVioR commented on code in PR #38008: URL: https://github.com/apache/spark/pull/38008#discussion_r980721524 ## python/pyspark/sql/tests/test_pandas_grouped_map_with_state.py: ## @@ -90,6 +107,99 @@ def check_results(batch_df, _): self.assertTrue(q.isActive) q.processAllAvailable() +def test_apply_in_pandas_with_state_python_worker_random_failure(self): +output_path = tempfile.mkdtemp() +checkpoint_loc = tempfile.mkdtemp() +shutil.rmtree(output_path) +shutil.rmtree(checkpoint_loc) + +def run_query(): +df = self.spark.readStream.format("text") \ +.option("maxFilesPerTrigger", "1") \ +.load(self.base_path + "/random_failure/input") + +for q in self.spark.streams.active: +q.stop() +self.assertTrue(df.isStreaming) + +output_type = StructType( +[StructField("value", StringType()), StructField("count", LongType())] +) +state_type = StructType([StructField("cnt", LongType())]) + +def func(key, pdf_iter, state): +assert isinstance(state, GroupState) + +# should be huge enough to not trigger kill in every batches +# but should be also reasonable to trigger kill multiple times across batches +if random.randrange(300) == 1: +sys.exit(1) + +count = state.getOption +if count is None: +count = 0 +else: +count = count[0] + +for pdf in pdf_iter: +count += len(pdf) + +state.update((count,)) +yield pd.DataFrame({"value": [key[0]], "count": [count]}) + +q = ( +df.groupBy(df["value"]) +.applyInPandasWithState( +func, output_type, state_type, "Append", GroupStateTimeout.NoTimeout +) +.writeStream.queryName("this_query") +.format("json") +.outputMode("append") +.option("path", output_path) +.option("checkpointLocation", checkpoint_loc) +.start() +) + +return q + +q = run_query() + +self.assertEqual(q.name, "this_query") +self.assertTrue(q.isActive) + +# expected_output directory is constucted from below query: +# spark.read.format("text").load("./input").groupBy("value").count() \ +# .repartition(1).sort("value").write.format("json").save("./output") +expected = self.spark.read.schema("value string, count int").format("json") \ +.load(self.base_path + "/random_failure/expected_output") \ +.sort("value").collect() Review Comment: Thanks, I didn't know that is standard - I thought that's workaround. Good to know. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38008: [SPARK-40571][SS][TESTS] Construct a new test case for applyInPandasWithState to verify fault-tolerance semantic with random py
HeartSaVioR commented on code in PR #38008: URL: https://github.com/apache/spark/pull/38008#discussion_r980721102 ## python/test_support/sql/streaming/apply_in_pandas_with_state/random_failure/input/test-0.txt: ## @@ -0,0 +1,100 @@ +non Review Comment: I just changed both tests to create a dataset files before running a test; neither input files nor golden file is needed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38008: [SPARK-40571][SS][TESTS] Construct a new test case for applyInPandasWithState to verify fault-tolerance semantic with random py
HeartSaVioR commented on code in PR #38008: URL: https://github.com/apache/spark/pull/38008#discussion_r980719058 ## python/pyspark/sql/tests/test_pandas_grouped_map_with_state.py: ## @@ -90,6 +107,99 @@ def check_results(batch_df, _): self.assertTrue(q.isActive) q.processAllAvailable() +def test_apply_in_pandas_with_state_python_worker_random_failure(self): +output_path = tempfile.mkdtemp() +checkpoint_loc = tempfile.mkdtemp() +shutil.rmtree(output_path) +shutil.rmtree(checkpoint_loc) + +def run_query(): +df = self.spark.readStream.format("text") \ +.option("maxFilesPerTrigger", "1") \ +.load(self.base_path + "/random_failure/input") + +for q in self.spark.streams.active: +q.stop() +self.assertTrue(df.isStreaming) + +output_type = StructType( +[StructField("value", StringType()), StructField("count", LongType())] +) +state_type = StructType([StructField("cnt", LongType())]) + +def func(key, pdf_iter, state): +assert isinstance(state, GroupState) + +# should be huge enough to not trigger kill in every batches +# but should be also reasonable to trigger kill multiple times across batches +if random.randrange(300) == 1: +sys.exit(1) + +count = state.getOption +if count is None: +count = 0 +else: +count = count[0] + +for pdf in pdf_iter: +count += len(pdf) + +state.update((count,)) +yield pd.DataFrame({"value": [key[0]], "count": [count]}) + +q = ( +df.groupBy(df["value"]) +.applyInPandasWithState( +func, output_type, state_type, "Append", GroupStateTimeout.NoTimeout +) +.writeStream.queryName("this_query") +.format("json") +.outputMode("append") +.option("path", output_path) +.option("checkpointLocation", checkpoint_loc) +.start() +) + +return q + +q = run_query() + +self.assertEqual(q.name, "this_query") +self.assertTrue(q.isActive) + +# expected_output directory is constucted from below query: +# spark.read.format("text").load("./input").groupBy("value").count() \ +# .repartition(1).sort("value").write.format("json").save("./output") +expected = self.spark.read.schema("value string, count int").format("json") \ +.load(self.base_path + "/random_failure/expected_output") \ +.sort("value").collect() + +curr_time = time.time() +timeout = curr_time + 120 # 2 minutes Review Comment: Thanks for the info! I didn't notice that. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] [spark] HeartSaVioR commented on a diff in pull request #38008: [SPARk-40571][SS][TESTS] Construct a new test case for applyInPandasWithState to verify fault-tolerance semantic with random py
HeartSaVioR commented on code in PR #38008: URL: https://github.com/apache/spark/pull/38008#discussion_r980664896 ## python/pyspark/sql/tests/test_pandas_grouped_map_with_state.py: ## @@ -90,6 +107,99 @@ def check_results(batch_df, _): self.assertTrue(q.isActive) q.processAllAvailable() +def test_apply_in_pandas_with_state_python_worker_random_failure(self): +output_path = tempfile.mkdtemp() +checkpoint_loc = tempfile.mkdtemp() +shutil.rmtree(output_path) +shutil.rmtree(checkpoint_loc) + +def run_query(): +df = self.spark.readStream.format("text") \ +.option("maxFilesPerTrigger", "1") \ Review Comment: This query runs 10 batches from 10 files, which each file has 100 words. 1000 words in overall. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org