zhengruifeng commented on code in PR #53528:
URL: https://github.com/apache/spark/pull/53528#discussion_r2652024776


##########
python/run-tests.py:
##########
@@ -234,27 +234,43 @@ def run_individual_python_test(target_dir, test_name, 
pyspark_python, keep_test_
 
     env["PYSPARK_SUBMIT_ARGS"] = " ".join(spark_args)
 
+    timeout = os.environ.get("PYSPARK_TEST_TIMEOUT")
+    if timeout is not None:
+        env["PYSPARK_TEST_TIMEOUT"] = timeout
+        timeout = int(timeout)
+
     output_prefix = get_valid_filename(pyspark_python + "__" + test_name + 
"__").lstrip("_")
     # Delete is always set to False since the cleanup will be either done by 
removing the
     # whole test dir, or the test output is retained.
     per_test_output = tempfile.NamedTemporaryFile(prefix=output_prefix, 
dir=tmp_dir,
                                                   suffix=".log", delete=False)
     LOGGER.info(
         "Starting test(%s): %s (temp output: %s)", pyspark_python, test_name, 
per_test_output.name)
+    cmd = [os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split()
     start_time = time.time()
+
+    retcode = None
+    proc = None
     try:
-        retcode = TestRunner(
-            [os.path.join(SPARK_HOME, "bin/pyspark")] + test_name.split(),
-            env,
-            per_test_output
-        ).run()
+        if timeout:
+            proc = subprocess.Popen(cmd, stderr=per_test_output, 
stdout=per_test_output, env=env)
+            retcode = proc.wait(timeout=timeout)
+        else:
+            retcode = TestRunner(cmd, env, per_test_output).run()
         if not keep_test_output:
             # There exists a race condition in Python and it causes flakiness 
in MacOS
             # https://github.com/python/cpython/issues/73885
             if platform.system() == "Darwin":
                 os.system("rm -rf " + tmp_dir)
             else:
                 shutil.rmtree(tmp_dir, ignore_errors=True)
+    except subprocess.TimeoutExpired:
+        if timeout and proc:
+            LOGGER.exception("Got TimeoutExpired while running %s with %s", 
test_name, pyspark_python)
+            proc.terminate()
+            proc.communicate(timeout=60)

Review Comment:
   In one of the runs (in my local), it seems the `proc.communicate` itself 
gets stuck so I also add a timeout here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to