================ @@ -189,262 +298,322 @@ def _read_packet_thread(self): while not done: packet = read_packet(self.recv, trace_file=self.trace_file) # `packet` will be `None` on EOF. We want to pass it down to - # handle_recv_packet anyway so the main thread can handle unexpected - # termination of lldb-dap and stop waiting for new packets. + # handle_recv_packet anyway so the main thread can handle + # unexpected termination of lldb-dap and stop waiting for new + # packets. done = not self._handle_recv_packet(packet) finally: dump_dap_log(self.log_file) - def get_modules(self): - module_list = self.request_modules()["body"]["modules"] - modules = {} - for module in module_list: - modules[module["name"]] = module - return modules + def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool: + """Handles an incoming packet. - def get_output(self, category, timeout=0.0, clear=True): - self.output_condition.acquire() - output = None - if category in self.output: - output = self.output[category] - if clear: - del self.output[category] - elif timeout != 0.0: - self.output_condition.wait(timeout) - if category in self.output: - output = self.output[category] - if clear: - del self.output[category] - self.output_condition.release() - return output + Called by the read thread that is waiting for all incoming packets + to store the incoming packet in "self._recv_packets" in a thread safe + way. This function will then signal the "self._recv_condition" to + indicate a new packet is available. - def collect_output(self, category, timeout_secs, pattern, clear=True): - end_time = time.time() + timeout_secs - collected_output = "" - while end_time > time.time(): - output = self.get_output(category, timeout=0.25, clear=clear) - if output: - collected_output += output - if pattern is not None and pattern in output: - break - return collected_output if collected_output else None - - def _enqueue_recv_packet(self, packet: Optional[ProtocolMessage]): - self.recv_condition.acquire() - self.recv_packets.append(packet) - self.recv_condition.notify() - self.recv_condition.release() + Args: + packet: A new packet to store. - def _handle_recv_packet(self, packet: Optional[ProtocolMessage]) -> bool: - """Called by the read thread that is waiting for all incoming packets - to store the incoming packet in "self.recv_packets" in a thread safe - way. This function will then signal the "self.recv_condition" to - indicate a new packet is available. Returns True if the caller - should keep calling this function for more packets. + Returns: + True if the caller should keep calling this function for more + packets. """ - # If EOF, notify the read thread by enqueuing a None. - if not packet: - self._enqueue_recv_packet(None) - return False - - # Check the packet to see if is an event packet - keepGoing = True - packet_type = packet["type"] - if packet_type == "event": - event = packet["event"] - body = None - if "body" in packet: - body = packet["body"] - # Handle the event packet and cache information from these packets - # as they come in - if event == "output": - # Store any output we receive so clients can retrieve it later. - category = body["category"] - output = body["output"] - self.output_condition.acquire() - if category in self.output: - self.output[category] += output - else: - self.output[category] = output - self.output_condition.notify() - self.output_condition.release() - # no need to add 'output' event packets to our packets list - return keepGoing - elif event == "initialized": - self.initialized = True - elif event == "process": - # When a new process is attached or launched, remember the - # details that are available in the body of the event - self.process_event_body = body - elif event == "exited": - # Process exited, mark the status to indicate the process is not - # alive. - self.exit_status = body["exitCode"] - elif event == "continued": - # When the process continues, clear the known threads and - # thread_stop_reasons. - all_threads_continued = body.get("allThreadsContinued", True) - tid = body["threadId"] - if tid in self.thread_stop_reasons: - del self.thread_stop_reasons[tid] - self._process_continued(all_threads_continued) - elif event == "stopped": - # Each thread that stops with a reason will send a - # 'stopped' event. We need to remember the thread stop - # reasons since the 'threads' command doesn't return - # that information. - self._process_stopped() - tid = body["threadId"] - self.thread_stop_reasons[tid] = body - elif event.startswith("progress"): - # Progress events come in as 'progressStart', 'progressUpdate', - # and 'progressEnd' events. Keep these around in case test - # cases want to verify them. - self.progress_events.append(packet) - elif event == "breakpoint": - # Breakpoint events are sent when a breakpoint is resolved - self._update_verified_breakpoints([body["breakpoint"]]) - elif event == "capabilities": - # Update the capabilities with new ones from the event. - self.capabilities.update(body["capabilities"]) - - elif packet_type == "response": - if packet["command"] == "disconnect": - keepGoing = False - self._enqueue_recv_packet(packet) - return keepGoing + with self._recv_condition: + self._recv_packets.append(packet) + self._recv_condition.notify() + # packet is None on EOF + return packet is not None and not ( + packet["type"] == "response" and packet["command"] == "disconnect" + ) + + def _recv_packet( + self, + *, + predicate: Optional[Callable[[ProtocolMessage], bool]] = None, + timeout: Optional[float] = None, + ) -> Optional[ProtocolMessage]: + """Processes recived packets from the adapter. + + Updates the DebugCommunication stateful properties based on the received + packets in the order they are recieved. + + NOTE: The only time the session state properties should be updated is + during this call to ensure consistency during tests. + + Args: + predicate: + Optional, if specified, returns the first packet that matches + the given predicate. + timeout: + Optional, if specified, processes packets until either the + timeout occurs or the predicate matches a packet, whichever + occurs first. + + Returns: + The first matching packet for the given predicate, if specified, + otherwise None. + """ + assert ( + threading.current_thread != self._recv_thread + ), "Must not be called from the _recv_thread" + + def process_until_match(): + self._process_recv_packets() + for i, packet in enumerate(self._pending_packets): + if packet is None: + # We need to return a truthy value to break out of the + # wait_for, use `EOFError` as an indicator of EOF. + return EOFError() + if predicate and predicate(packet): + self._pending_packets.pop(i) + return packet + + with self._recv_condition: + packet = self._recv_condition.wait_for(process_until_match, timeout) + return None if isinstance(packet, EOFError) else packet + + def _process_recv_packets(self) -> None: + """Process received packets, updating the session state.""" + with self._recv_condition: + for packet in self._recv_packets: + # Handle events that may modify any stateful properties of + # the DAP session. + if packet and packet["type"] == "event": + self._handle_event(packet) + elif packet and packet["type"] == "request": + # Handle reverse requests and keep processing. + self._handle_reverse_request(packet) + # Move the packet to the pending queue. + self._pending_packets.append(packet) + self._recv_packets.clear() + + def _handle_event(self, packet: Event) -> None: + """Handle any events that modify debug session state we track.""" + event = packet["event"] + body: Optional[Dict] = packet.get("body", None) + + if event == "output": + # Store any output we receive so clients can retrieve it later. + category = body["category"] + output = body["output"] + if category in self.output: + self.output[category] += output + else: + self.output[category] = output + elif event == "initialized": + self.initialized = True + elif event == "process": + # When a new process is attached or launched, remember the + # details that are available in the body of the event + self.process_event_body = body + elif event == "exited": + # Process exited, mark the status to indicate the process is not + # alive. + self.exit_status = body["exitCode"] + elif event == "continued": + # When the process continues, clear the known threads and + # thread_stop_reasons. + all_threads_continued = ( + body.get("allThreadsContinued", True) if body else True + ) + tid = body["threadId"] + if tid in self.thread_stop_reasons: + del self.thread_stop_reasons[tid] + self._process_continued(all_threads_continued) + elif event == "stopped": + # Each thread that stops with a reason will send a + # 'stopped' event. We need to remember the thread stop + # reasons since the 'threads' command doesn't return + # that information. + self._process_stopped() + tid = body["threadId"] + self.thread_stop_reasons[tid] = body + elif event.startswith("progress"): + # Progress events come in as 'progressStart', 'progressUpdate', + # and 'progressEnd' events. Keep these around in case test + # cases want to verify them. + self.progress_events.append(packet) + elif event == "breakpoint": + # Breakpoint events are sent when a breakpoint is resolved + self._update_verified_breakpoints([body["breakpoint"]]) + elif event == "capabilities": + # Update the capabilities with new ones from the event. + self.capabilities.update(body["capabilities"]) + + def _handle_reverse_request(self, request: Request) -> None: + if request in self.reverse_requests: + return + self.reverse_requests.append(request) + arguments = request.get("arguments") + if request["command"] == "runInTerminal" and arguments is not None: + in_shell = arguments.get("argsCanBeInterpretedByShell", False) + proc = subprocess.Popen( + arguments["args"], + env=arguments.get("env", {}), + cwd=arguments["cwd"], + stdin=subprocess.DEVNULL, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + shell=in_shell, + ) + body = {} + if in_shell: + body["shellProcessId"] = proc.pid + else: + body["processId"] = proc.pid + self.send_packet( + { + "type": "response", + "seq": 0, + "request_seq": request["seq"], + "success": True, + "command": "runInTerminal", + "message": None, + "body": body, + } + ) + elif request["command"] == "startDebugging": + self.send_packet( + { + "type": "response", + "seq": 0, + "request_seq": request["seq"], + "success": True, + "message": None, + "command": "startDebugging", + "body": {}, + } + ) + else: + desc = 'unknown reverse request "%s"' % (request["command"]) + raise ValueError(desc) def _process_continued(self, all_threads_continued: bool): self.frame_scopes = {} if all_threads_continued: self.thread_stop_reasons = {} - def _update_verified_breakpoints(self, breakpoints: list[Event]): + def _update_verified_breakpoints(self, breakpoints: list[Breakpoint]): for breakpoint in breakpoints: ---------------- ashgti wrote:
Done. https://github.com/llvm/llvm-project/pull/143818 _______________________________________________ lldb-commits mailing list lldb-commits@lists.llvm.org https://lists.llvm.org/cgi-bin/mailman/listinfo/lldb-commits