HyukjinKwon commented on code in PR #53273:
URL: https://github.com/apache/spark/pull/53273#discussion_r2579325394


##########
python/run-tests.py:
##########
@@ -74,6 +77,103 @@ def get_valid_filename(s):
         raise RuntimeError("Cannot find assembly build directory, please build 
Spark first.")
 
 
+class TestRunner:
+    def __init__(self, cmd, env, test_output):
+        self.cmd = cmd
+        self.env = env
+        self.test_output = test_output
+        self.pdb_mode = False
+        self.loop = asyncio.new_event_loop()
+        self.master_fd = None
+        self.write_task = None
+
+    def run(self):
+        """
+        Run a command in subprocess, with stdin, stdout, stderr hooked.
+        In normaly case, all the outputs from subprocess will be redirected to
+        the test_output file.
+        When `(Pdb)` is detected, the subprocess will be in interactive mode,
+        and the output will be redirected to the console.
+        """
+        self.master_fd, slave_fd = pty.openpty()
+
+        # Start child connected to the PTY
+        p = subprocess.Popen(
+            self.cmd,
+            env=self.env,
+            stdin=slave_fd,
+            stdout=slave_fd,
+            stderr=slave_fd,
+        )
+        os.close(slave_fd)
+
+        self.loop.run_until_complete(self.handle_inout())
+        return p.wait()
+
+    async def handle_inout(self):
+        await self.read_from_child()
+
+    def output_line(self, line):
+        if self.pdb_mode:
+            sys.stdout.write(line.decode("utf-8", "replace"))
+            sys.stdout.flush()
+        else:
+            if isinstance(self.test_output, io.TextIOBase):
+                self.test_output.write(line.decode("utf-8", "replace"))
+            else:
+                self.test_output.write(line)
+
+    def process_buffer(self, buffer, force_flush=False):
+        # Process all full lines first
+        while (nl := buffer.find(b"\n")) != -1:
+            self.output_line(buffer[:nl + 1])
+            buffer = buffer[nl + 1:]
+        # Process the remaining buffer
+        if b"(Pdb)" in buffer:
+            self.pdb_mode = True
+            self.output_line(buffer)
+            return b""
+        elif force_flush:
+            self.output_line(buffer)
+            return b""
+        else:
+            return buffer
+
+    # Reader: forward child output to our stdout
+    async def read_from_child(self):
+        buffer = b""
+        while True:
+            try:
+                data = await self.loop.run_in_executor(None, os.read, 
self.master_fd, 1024)
+            except OSError:
+                break
+            if not data:
+                break
+            buffer += data
+            buffer = self.process_buffer(buffer)
+            if self.pdb_mode and self.write_task is None:
+                self.write_task = self.loop.create_task(self.write_to_child())
+        buffer = self.process_buffer(buffer, force_flush=True)
+        self.test_output.flush()
+
+        if self.write_task is not None:
+            self.write_task.cancel()
+            try:
+                await self.write_task
+            except asyncio.CancelledError:
+                pass
+            self.write_task = None
+            self.pdb_mode = False
+
+    # Writer: forward our stdin → child tty

Review Comment:
   let's avoid special characters `→`. I think the linter fails anyway IIRC



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to