================
@@ -189,262 +298,322 @@ def _read_packet_thread(self):
             while not done:
                 packet = read_packet(self.recv, trace_file=self.trace_file)
                 # `packet` will be `None` on EOF. We want to pass it down to
-                # handle_recv_packet anyway so the main thread can handle 
unexpected
-                # termination of lldb-dap and stop waiting for new packets.
+                # handle_recv_packet anyway so the main thread can handle
+                # unexpected termination of lldb-dap and stop waiting for new
+                # packets.
                 done = not self._handle_recv_packet(packet)
         finally:
             dump_dap_log(self.log_file)
 
-    def get_modules(self):
-        module_list = self.request_modules()["body"]["modules"]
-        modules = {}
-        for module in module_list:
-            modules[module["name"]] = module
-        return modules
+    def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
+        """Handles an incoming packet.
 
-    def get_output(self, category, timeout=0.0, clear=True):
-        self.output_condition.acquire()
-        output = None
-        if category in self.output:
-            output = self.output[category]
-            if clear:
-                del self.output[category]
-        elif timeout != 0.0:
-            self.output_condition.wait(timeout)
-            if category in self.output:
-                output = self.output[category]
-                if clear:
-                    del self.output[category]
-        self.output_condition.release()
-        return output
+        Called by the read thread that is waiting for all incoming packets
+        to store the incoming packet in "self._recv_packets" in a thread safe
+        way. This function will then signal the "self._recv_condition" to
+        indicate a new packet is available.
 
-    def collect_output(self, category, timeout_secs, pattern, clear=True):
-        end_time = time.time() + timeout_secs
-        collected_output = ""
-        while end_time > time.time():
-            output = self.get_output(category, timeout=0.25, clear=clear)
-            if output:
-                collected_output += output
-                if pattern is not None and pattern in output:
-                    break
-        return collected_output if collected_output else None
-
-    def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]):
-        self.recv_condition.acquire()
-        self.recv_packets.append(packet)
-        self.recv_condition.notify()
-        self.recv_condition.release()
+        Args:
+            packet: A new packet to store.
 
-    def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool:
-        """Called by the read thread that is waiting for all incoming packets
-        to store the incoming packet in "self.recv_packets" in a thread safe
-        way. This function will then signal the "self.recv_condition" to
-        indicate a new packet is available. Returns True if the caller
-        should keep calling this function for more packets.
+        Returns:
+            True if the caller should keep calling this function for more
+            packets.
         """
-        # If EOF, notify the read thread by enqueuing a None.
-        if not packet:
-            self._enqueue_recv_packet(None)
-            return False
-
-        # Check the packet to see if is an event packet
-        keepGoing = True
-        packet_type = packet["type"]
-        if packet_type == "event":
-            event = packet["event"]
-            body = None
-            if "body" in packet:
-                body = packet["body"]
-            # Handle the event packet and cache information from these packets
-            # as they come in
-            if event == "output":
-                # Store any output we receive so clients can retrieve it later.
-                category = body["category"]
-                output = body["output"]
-                self.output_condition.acquire()
-                if category in self.output:
-                    self.output[category] += output
-                else:
-                    self.output[category] = output
-                self.output_condition.notify()
-                self.output_condition.release()
-                # no need to add 'output' event packets to our packets list
-                return keepGoing
-            elif event == "initialized":
-                self.initialized = True
-            elif event == "process":
-                # When a new process is attached or launched, remember the
-                # details that are available in the body of the event
-                self.process_event_body = body
-            elif event == "exited":
-                # Process exited, mark the status to indicate the process is 
not
-                # alive.
-                self.exit_status = body["exitCode"]
-            elif event == "continued":
-                # When the process continues, clear the known threads and
-                # thread_stop_reasons.
-                all_threads_continued = body.get("allThreadsContinued", True)
-                tid = body["threadId"]
-                if tid in self.thread_stop_reasons:
-                    del self.thread_stop_reasons[tid]
-                self._process_continued(all_threads_continued)
-            elif event == "stopped":
-                # Each thread that stops with a reason will send a
-                # 'stopped' event. We need to remember the thread stop
-                # reasons since the 'threads' command doesn't return
-                # that information.
-                self._process_stopped()
-                tid = body["threadId"]
-                self.thread_stop_reasons[tid] = body
-            elif event.startswith("progress"):
-                # Progress events come in as 'progressStart', 'progressUpdate',
-                # and 'progressEnd' events. Keep these around in case test
-                # cases want to verify them.
-                self.progress_events.append(packet)
-            elif event == "breakpoint":
-                # Breakpoint events are sent when a breakpoint is resolved
-                self._update_verified_breakpoints([body["breakpoint"]])
-            elif event == "capabilities":
-                # Update the capabilities with new ones from the event.
-                self.capabilities.update(body["capabilities"])
-
-        elif packet_type == "response":
-            if packet["command"] == "disconnect":
-                keepGoing = False
-        self._enqueue_recv_packet(packet)
-        return keepGoing
+        with self._recv_condition:
+            self._recv_packets.append(packet)
+            self._recv_condition.notify()
+            # packet is None on EOF
+            return packet is not None and not (
+                packet["type"] == "response" and packet["command"] == 
"disconnect"
+            )
+
+    def _recv_packet(
+        self,
+        *,
+        predicate: Optional[Callable[[ProtocolMessage], bool]] = None,
+        timeout: Optional[float] = None,
+    ) -> Optional[ProtocolMessage]:
+        """Processes recived packets from the adapter.
+
+        Updates the DebugCommunication stateful properties based on the 
received
+        packets in the order they are recieved.
+
+        NOTE: The only time the session state properties should be updated is
+        during this call to ensure consistency during tests.
+
+        Args:
+            predicate:
+                Optional, if specified, returns the first packet that matches
+                the given predicate.
+            timeout:
+                Optional, if specified, processes packets until either the
+                timeout occurs or the predicate matches a packet, whichever
+                occurs first.
+
+        Returns:
+            The first matching packet for the given predicate, if specified,
+            otherwise None.
+        """
+        assert (
+            threading.current_thread != self._recv_thread
+        ), "Must not be called from the _recv_thread"
+
+        def process_until_match():
+            self._process_recv_packets()
+            for i, packet in enumerate(self._pending_packets):
+                if packet is None:
+                    # We need to return a truthy value to break out of the
+                    # wait_for, use `EOFError` as an indicator of EOF.
+                    return EOFError()
+                if predicate and predicate(packet):
+                    self._pending_packets.pop(i)
+                    return packet
+
+        with self._recv_condition:
+            packet = self._recv_condition.wait_for(process_until_match, 
timeout)
+            return None if isinstance(packet, EOFError) else packet
+
+    def _process_recv_packets(self) -> None:
+        """Process received packets, updating the session state."""
+        with self._recv_condition:
+            for packet in self._recv_packets:
+                # Handle events that may modify any stateful properties of
+                # the DAP session.
+                if packet and packet["type"] == "event":
+                    self._handle_event(packet)
+                elif packet and packet["type"] == "request":
+                    # Handle reverse requests and keep processing.
+                    self._handle_reverse_request(packet)
+                # Move the packet to the pending queue.
+                self._pending_packets.append(packet)
+            self._recv_packets.clear()
+
+    def _handle_event(self, packet: Event) -> None:
+        """Handle any events that modify debug session state we track."""
+        event = packet["event"]
+        body: Optional[Dict] = packet.get("body", None)
+
+        if event == "output":
+            # Store any output we receive so clients can retrieve it later.
+            category = body["category"]
+            output = body["output"]
+            if category in self.output:
+                self.output[category] += output
+            else:
+                self.output[category] = output
+        elif event == "initialized":
+            self.initialized = True
+        elif event == "process":
+            # When a new process is attached or launched, remember the
+            # details that are available in the body of the event
+            self.process_event_body = body
+        elif event == "exited":
+            # Process exited, mark the status to indicate the process is not
+            # alive.
+            self.exit_status = body["exitCode"]
+        elif event == "continued":
+            # When the process continues, clear the known threads and
+            # thread_stop_reasons.
+            all_threads_continued = (
+                body.get("allThreadsContinued", True) if body else True
+            )
+            tid = body["threadId"]
+            if tid in self.thread_stop_reasons:
+                del self.thread_stop_reasons[tid]
+            self._process_continued(all_threads_continued)
+        elif event == "stopped":
+            # Each thread that stops with a reason will send a
+            # 'stopped' event. We need to remember the thread stop
+            # reasons since the 'threads' command doesn't return
+            # that information.
+            self._process_stopped()
+            tid = body["threadId"]
+            self.thread_stop_reasons[tid] = body
+        elif event.startswith("progress"):
+            # Progress events come in as 'progressStart', 'progressUpdate',
+            # and 'progressEnd' events. Keep these around in case test
+            # cases want to verify them.
+            self.progress_events.append(packet)
+        elif event == "breakpoint":
+            # Breakpoint events are sent when a breakpoint is resolved
+            self._update_verified_breakpoints([body["breakpoint"]])
+        elif event == "capabilities":
+            # Update the capabilities with new ones from the event.
+            self.capabilities.update(body["capabilities"])
+
+    def _handle_reverse_request(self, request: Request) -> None:
+        if request in self.reverse_requests:
+            return
+        self.reverse_requests.append(request)
+        arguments = request.get("arguments")
+        if request["command"] == "runInTerminal" and arguments is not None:
+            in_shell = arguments.get("argsCanBeInterpretedByShell", False)
+            proc = subprocess.Popen(
+                arguments["args"],
+                env=arguments.get("env", {}),
+                cwd=arguments["cwd"],
+                stdin=subprocess.DEVNULL,
+                stdout=subprocess.DEVNULL,
+                stderr=subprocess.DEVNULL,
+                shell=in_shell,
+            )
+            body = {}
+            if in_shell:
+                body["shellProcessId"] = proc.pid
+            else:
+                body["processId"] = proc.pid
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "command": "runInTerminal",
+                    "message": None,
+                    "body": body,
+                }
+            )
+        elif request["command"] == "startDebugging":
+            self.send_packet(
+                {
+                    "type": "response",
+                    "seq": 0,
+                    "request_seq": request["seq"],
+                    "success": True,
+                    "message": None,
+                    "command": "startDebugging",
+                    "body": {},
+                }
+            )
+        else:
+            desc = 'unknown reverse request "%s"' % (request["command"])
+            raise ValueError(desc)
 
     def _process_continued(self, all_threads_continued: bool):
         self.frame_scopes = {}
         if all_threads_continued:
             self.thread_stop_reasons = {}
 
-    def _update_verified_breakpoints(self, breakpoints: list[Event]):
+    def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]):
         for breakpoint in breakpoints:
-            if "id" in breakpoint:
-                self.resolved_breakpoints[str(breakpoint["id"])] = 
breakpoint.get(
-                    "verified", False
-                )
+            # If no id is set, we cannot correlate the given breakpoint across
+            # requests, ignore it.
+            if "id" not in breakpoint:
+                continue
+
+            self.resolved_breakpoints[str(breakpoint["id"])] = breakpoint.get(
+                "verified", False
+            )
 
-    def send_packet(self, command_dict: Request, set_sequence=True):
+    def _send_recv(self, request: Request) -> Optional[Response]:
+        """Send a command python dictionary as JSON and receive the JSON
+        response. Validates that the response is the correct sequence and
+        command in the reply. Any events that are received are added to the
+        events list in this object"""
+        seq = self.send_packet(request)
+        response = self.receive_response(seq)
+        if response is None:
+            desc = 'no response for "%s"' % (request["command"])
----------------
da-viper wrote:

use the fstring or `.format` syntax

https://github.com/llvm/llvm-project/pull/143818
_______________________________________________
lldb-commits mailing list
lldb-commits@lists.llvm.org
https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits

Reply via email to