https://github.com/python/cpython/commit/d6faac6d1f825405398158272286aaed94eb51fc
commit: d6faac6d1f825405398158272286aaed94eb51fc
branch: 3.13
author: Miss Islington (bot) <31488909+miss-isling...@users.noreply.github.com>
committer: ambv <luk...@langa.pl>
date: 2024-05-31T17:18:28+02:00
summary:

[3.13] gh-111201: Support pyrepl on Windows (GH-119559) (GH-119850)

(cherry picked from commit 0d07182821fad7b95a043d006f1ce13a2d22edcb)

Co-authored-by: Dino Viehland <dinoviehl...@gmail.com>
Co-authored-by: Anthony Shaw <anthony.p.s...@gmail.com>
Co-authored-by: Łukasz Langa <luk...@langa.pl>

files:
A Lib/_pyrepl/windows_console.py
A Lib/test/test_pyrepl/test_windows_console.py
A Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst
M Doc/whatsnew/3.13.rst
M Lib/_pyrepl/__main__.py
M Lib/_pyrepl/console.py
M Lib/_pyrepl/reader.py
M Lib/_pyrepl/readline.py
M Lib/_pyrepl/simple_interact.py
M Lib/_pyrepl/unix_console.py
M Lib/test/test_pyrepl/__init__.py
M Lib/test/test_pyrepl/support.py
M Lib/test/test_pyrepl/test_pyrepl.py
M Lib/test/test_pyrepl/test_unix_console.py
M Lib/test/test_pyrepl/test_unix_eventqueue.py

diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst
index 8799cf2ce2bd2d..d09d7ce9864575 100644
--- a/Doc/whatsnew/3.13.rst
+++ b/Doc/whatsnew/3.13.rst
@@ -154,10 +154,10 @@ New Features
 A Better Interactive Interpreter
 --------------------------------
 
-On Unix-like systems like Linux or macOS, Python now uses a new
-:term:`interactive` shell. When the user starts the :term:`REPL` from an
-interactive terminal, and both :mod:`curses` and :mod:`readline` are
-available, the interactive shell now supports the following new features:
+On Unix-like systems like Linux or macOS as well as Windows, Python now
+uses a new :term:`interactive` shell. When the user starts the
+:term:`REPL` from an interactive terminal the interactive shell now
+supports the following new features:
 
 * Colorized prompts.
 * Multiline editing with history preservation.
@@ -174,10 +174,13 @@ available, the interactive shell now supports the 
following new features:
 If the new interactive shell is not desired, it can be disabled via
 the :envvar:`PYTHON_BASIC_REPL` environment variable.
 
+The new shell requires :mod:`curses` on Unix-like systems.
+
 For more on interactive mode, see :ref:`tut-interac`.
 
 (Contributed by Pablo Galindo Salgado, Łukasz Langa, and
-Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.)
+Lysandros Nikolaou in :gh:`111201` based on code from the PyPy project.
+Windows support contributed by Dino Viehland and Anthony Shaw.)
 
 .. _whatsnew313-improved-error-messages:
 
diff --git a/Lib/_pyrepl/__main__.py b/Lib/_pyrepl/__main__.py
index c598019e7cd4ad..dae4ba6e178b9a 100644
--- a/Lib/_pyrepl/__main__.py
+++ b/Lib/_pyrepl/__main__.py
@@ -1,7 +1,11 @@
 import os
 import sys
 
-CAN_USE_PYREPL = sys.platform != "win32"
+CAN_USE_PYREPL: bool
+if sys.platform != "win32":
+    CAN_USE_PYREPL = True
+else:
+    CAN_USE_PYREPL = sys.getwindowsversion().build >= 10586  # Windows 10 TH2
 
 
 def interactive_console(mainmodule=None, quiet=False, pythonstartup=False):
diff --git a/Lib/_pyrepl/console.py b/Lib/_pyrepl/console.py
index d7e86e768671dc..fcabf785069ecb 100644
--- a/Lib/_pyrepl/console.py
+++ b/Lib/_pyrepl/console.py
@@ -19,10 +19,18 @@
 
 from __future__ import annotations
 
+import sys
+
 from abc import ABC, abstractmethod
 from dataclasses import dataclass, field
 
 
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+    from typing import IO
+
+
 @dataclass
 class Event:
     evt: str
@@ -36,6 +44,25 @@ class Console(ABC):
     height: int = 25
     width: int = 80
 
+    def __init__(
+        self,
+        f_in: IO[bytes] | int = 0,
+        f_out: IO[bytes] | int = 1,
+        term: str = "",
+        encoding: str = "",
+    ):
+        self.encoding = encoding or sys.getdefaultencoding()
+
+        if isinstance(f_in, int):
+            self.input_fd = f_in
+        else:
+            self.input_fd = f_in.fileno()
+
+        if isinstance(f_out, int):
+            self.output_fd = f_out
+        else:
+            self.output_fd = f_out.fileno()
+
     @abstractmethod
     def refresh(self, screen: list[str], xy: tuple[int, int]) -> None: ...
 
@@ -108,5 +135,4 @@ def wait(self) -> None:
         ...
 
     @abstractmethod
-    def repaint(self) -> None:
-        ...
+    def repaint(self) -> None: ...
diff --git a/Lib/_pyrepl/reader.py b/Lib/_pyrepl/reader.py
index 1c816d5bda5fed..8d9a22c272f88b 100644
--- a/Lib/_pyrepl/reader.py
+++ b/Lib/_pyrepl/reader.py
@@ -442,14 +442,13 @@ def get_arg(self, default: int = 1) -> int:
         """
         if self.arg is None:
             return default
-        else:
-            return self.arg
+        return self.arg
 
     def get_prompt(self, lineno: int, cursor_on_line: bool) -> str:
         """Return what should be in the left-hand margin for line
         `lineno'."""
         if self.arg is not None and cursor_on_line:
-            prompt = "(arg: %s) " % self.arg
+            prompt = f"(arg: {self.arg}) "
         elif self.paste_mode:
             prompt = "(paste) "
         elif "\n" in self.buffer:
@@ -515,12 +514,12 @@ def pos2xy(self) -> tuple[int, int]:
             offset = l - 1 if in_wrapped_line else l  # need to remove 
backslash
             if offset >= pos:
                 break
+
+            if p + sum(l2) >= self.console.width:
+                pos -= l - 1  # -1 cause backslash is not in buffer
             else:
-                if p + sum(l2) >= self.console.width:
-                    pos -= l - 1  # -1 cause backslash is not in buffer
-                else:
-                    pos -= l + 1  # +1 cause newline is in buffer
-                y += 1
+                pos -= l + 1  # +1 cause newline is in buffer
+            y += 1
         return p + sum(l2[:pos]), y
 
     def insert(self, text: str | list[str]) -> None:
@@ -582,7 +581,6 @@ def suspend(self) -> SimpleContextManager:
             for arg in ("msg", "ps1", "ps2", "ps3", "ps4", "paste_mode"):
                 setattr(self, arg, prev_state[arg])
             self.prepare()
-            pass
 
     def finish(self) -> None:
         """Called when a command signals that we're finished."""
diff --git a/Lib/_pyrepl/readline.py b/Lib/_pyrepl/readline.py
index 01da926941b256..7d811bf41773fe 100644
--- a/Lib/_pyrepl/readline.py
+++ b/Lib/_pyrepl/readline.py
@@ -38,7 +38,14 @@
 
 from . import commands, historical_reader
 from .completing_reader import CompletingReader
-from .unix_console import UnixConsole, _error
+from .console import Console as ConsoleType
+
+Console: type[ConsoleType]
+_error: tuple[type[Exception], ...] | type[Exception]
+try:
+    from .unix_console import UnixConsole as Console, _error
+except ImportError:
+    from .windows_console import WindowsConsole as Console, _error
 
 ENCODING = sys.getdefaultencoding() or "latin1"
 
@@ -339,7 +346,7 @@ def __post_init__(self) -> None:
 
     def get_reader(self) -> ReadlineAlikeReader:
         if self.reader is None:
-            console = UnixConsole(self.f_in, self.f_out, encoding=ENCODING)
+            console = Console(self.f_in, self.f_out, encoding=ENCODING)
             self.reader = ReadlineAlikeReader(console=console, 
config=self.config)
         return self.reader
 
diff --git a/Lib/_pyrepl/simple_interact.py b/Lib/_pyrepl/simple_interact.py
index 11e831c1d6c5d4..c624f6e12a7094 100644
--- a/Lib/_pyrepl/simple_interact.py
+++ b/Lib/_pyrepl/simple_interact.py
@@ -34,8 +34,12 @@
 from types import ModuleType
 
 from .readline import _get_reader, multiline_input
-from .unix_console import _error
 
+_error: tuple[type[Exception], ...] | type[Exception]
+try:
+    from .unix_console import _error
+except ModuleNotFoundError:
+    from .windows_console import _error
 
 def check() -> str:
     """Returns the error message if there is a problem initializing the 
state."""
diff --git a/Lib/_pyrepl/unix_console.py b/Lib/_pyrepl/unix_console.py
index ec7d0636b9aeb3..4bdb02261982c3 100644
--- a/Lib/_pyrepl/unix_console.py
+++ b/Lib/_pyrepl/unix_console.py
@@ -143,18 +143,7 @@ def __init__(
         - term (str): Terminal name.
         - encoding (str): Encoding to use for I/O operations.
         """
-
-        self.encoding = encoding or sys.getdefaultencoding()
-
-        if isinstance(f_in, int):
-            self.input_fd = f_in
-        else:
-            self.input_fd = f_in.fileno()
-
-        if isinstance(f_out, int):
-            self.output_fd = f_out
-        else:
-            self.output_fd = f_out.fileno()
+        super().__init__(f_in, f_out, term, encoding)
 
         self.pollob = poll()
         self.pollob.register(self.input_fd, select.POLLIN)
@@ -592,14 +581,19 @@ def __write_changed_line(self, y, oldline, newline, 
px_coord):
         px_pos = 0
         j = 0
         for c in oldline:
-            if j >= px_coord: break
+            if j >= px_coord:
+                break
             j += wlen(c)
             px_pos += 1
 
         # reuse the oldline as much as possible, but stop as soon as we
         # encounter an ESCAPE, because it might be the start of an escape
         # sequene
-        while x_coord < minlen and oldline[x_pos] == newline[x_pos] and 
newline[x_pos] != "\x1b":
+        while (
+            x_coord < minlen
+            and oldline[x_pos] == newline[x_pos]
+            and newline[x_pos] != "\x1b"
+        ):
             x_coord += wlen(newline[x_pos])
             x_pos += 1
 
@@ -619,7 +613,11 @@ def __write_changed_line(self, y, oldline, newline, 
px_coord):
             self.__posxy = x_coord + character_width, y
 
         # if it's a single character change in the middle of the line
-        elif x_coord < minlen and oldline[x_pos + 1 :] == newline[x_pos + 1 :] 
and wlen(oldline[x_pos]) == wlen(newline[x_pos]):
+        elif (
+            x_coord < minlen
+            and oldline[x_pos + 1 :] == newline[x_pos + 1 :]
+            and wlen(oldline[x_pos]) == wlen(newline[x_pos])
+        ):
             character_width = wlen(newline[x_pos])
             self.__move(x_coord, y)
             self.__write(newline[x_pos])
diff --git a/Lib/_pyrepl/windows_console.py b/Lib/_pyrepl/windows_console.py
new file mode 100644
index 00000000000000..2277865e3262fc
--- /dev/null
+++ b/Lib/_pyrepl/windows_console.py
@@ -0,0 +1,587 @@
+#   Copyright 2000-2004 Michael Hudson-Doyle <mica...@gmail.com>
+#
+#                        All Rights Reserved
+#
+#
+# Permission to use, copy, modify, and distribute this software and
+# its documentation for any purpose is hereby granted without fee,
+# provided that the above copyright notice appear in all copies and
+# that both that copyright notice and this permission notice appear in
+# supporting documentation.
+#
+# THE AUTHOR MICHAEL HUDSON DISCLAIMS ALL WARRANTIES WITH REGARD TO
+# THIS SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
+# AND FITNESS, IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL,
+# INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER
+# RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF
+# CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
+# CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
+
+from __future__ import annotations
+
+import io
+from multiprocessing import Value
+import os
+import sys
+
+from abc import ABC, abstractmethod
+from collections import deque
+from dataclasses import dataclass, field
+import ctypes
+from ctypes.wintypes import (
+    _COORD,
+    WORD,
+    SMALL_RECT,
+    BOOL,
+    HANDLE,
+    CHAR,
+    DWORD,
+    WCHAR,
+    SHORT,
+)
+from ctypes import Structure, POINTER, Union
+from .console import Event, Console
+from .trace import trace
+from .utils import wlen
+
+try:
+    from ctypes import GetLastError, WinDLL, windll, WinError  # type: 
ignore[attr-defined]
+except:
+    # Keep MyPy happy off Windows
+    from ctypes import CDLL as WinDLL, cdll as windll
+
+    def GetLastError() -> int:
+        return 42
+
+    class WinError(OSError):  # type: ignore[no-redef]
+        def __init__(self, err: int | None, descr: str | None = None) -> None:
+            self.err = err
+            self.descr = descr
+
+
+TYPE_CHECKING = False
+
+if TYPE_CHECKING:
+    from typing import IO
+
+VK_MAP: dict[int, str] = {
+    0x23: "end",  # VK_END
+    0x24: "home",  # VK_HOME
+    0x25: "left",  # VK_LEFT
+    0x26: "up",  # VK_UP
+    0x27: "right",  # VK_RIGHT
+    0x28: "down",  # VK_DOWN
+    0x2E: "delete",  # VK_DELETE
+    0x70: "f1",  # VK_F1
+    0x71: "f2",  # VK_F2
+    0x72: "f3",  # VK_F3
+    0x73: "f4",  # VK_F4
+    0x74: "f5",  # VK_F5
+    0x75: "f6",  # VK_F6
+    0x76: "f7",  # VK_F7
+    0x77: "f8",  # VK_F8
+    0x78: "f9",  # VK_F9
+    0x79: "f10",  # VK_F10
+    0x7A: "f11",  # VK_F11
+    0x7B: "f12",  # VK_F12
+    0x7C: "f13",  # VK_F13
+    0x7D: "f14",  # VK_F14
+    0x7E: "f15",  # VK_F15
+    0x7F: "f16",  # VK_F16
+    0x79: "f17",  # VK_F17
+    0x80: "f18",  # VK_F18
+    0x81: "f19",  # VK_F19
+    0x82: "f20",  # VK_F20
+}
+
+# Console escape codes: 
https://learn.microsoft.com/en-us/windows/console/console-virtual-terminal-sequences
+ERASE_IN_LINE = "\x1b[K"
+MOVE_LEFT = "\x1b[{}D"
+MOVE_RIGHT = "\x1b[{}C"
+MOVE_UP = "\x1b[{}A"
+MOVE_DOWN = "\x1b[{}B"
+CLEAR = "\x1b[H\x1b[J"
+
+
+class _error(Exception):
+    pass
+
+
+class WindowsConsole(Console):
+    def __init__(
+        self,
+        f_in: IO[bytes] | int = 0,
+        f_out: IO[bytes] | int = 1,
+        term: str = "",
+        encoding: str = "",
+    ):
+        super().__init__(f_in, f_out, term, encoding)
+
+        SetConsoleMode(
+            OutHandle,
+            ENABLE_WRAP_AT_EOL_OUTPUT
+            | ENABLE_PROCESSED_OUTPUT
+            | ENABLE_VIRTUAL_TERMINAL_PROCESSING,
+        )
+        self.screen: list[str] = []
+        self.width = 80
+        self.height = 25
+        self.__offset = 0
+        self.event_queue: deque[Event] = deque()
+        try:
+            self.out = io._WindowsConsoleIO(self.output_fd, "w")  # type: 
ignore[attr-defined]
+        except ValueError:
+            # Console I/O is redirected, fallback...
+            self.out = None
+
+    def refresh(self, screen: list[str], c_xy: tuple[int, int]) -> None:
+        """
+        Refresh the console screen.
+
+        Parameters:
+        - screen (list): List of strings representing the screen contents.
+        - c_xy (tuple): Cursor position (x, y) on the screen.
+        """
+        cx, cy = c_xy
+
+        while len(self.screen) < min(len(screen), self.height):
+            self._hide_cursor()
+            self._move_relative(0, len(self.screen) - 1)
+            self.__write("\n")
+            self.__posxy = 0, len(self.screen)
+            self.screen.append("")
+
+        px, py = self.__posxy
+        old_offset = offset = self.__offset
+        height = self.height
+
+        # we make sure the cursor is on the screen, and that we're
+        # using all of the screen if we can
+        if cy < offset:
+            offset = cy
+        elif cy >= offset + height:
+            offset = cy - height + 1
+            scroll_lines = offset - old_offset
+
+            # Scrolling the buffer as the current input is greater than the 
visible
+            # portion of the window.  We need to scroll the visible portion 
and the
+            # entire history
+            self._scroll(scroll_lines, self._getscrollbacksize())
+            self.__posxy = self.__posxy[0], self.__posxy[1] + scroll_lines
+            self.__offset += scroll_lines
+
+            for i in range(scroll_lines):
+                self.screen.append("")
+        elif offset > 0 and len(screen) < offset + height:
+            offset = max(len(screen) - height, 0)
+            screen.append("")
+
+        oldscr = self.screen[old_offset : old_offset + height]
+        newscr = screen[offset : offset + height]
+
+        self.__offset = offset
+
+        self._hide_cursor()
+        for (
+            y,
+            oldline,
+            newline,
+        ) in zip(range(offset, offset + height), oldscr, newscr):
+            if oldline != newline:
+                self.__write_changed_line(y, oldline, newline, px)
+
+        y = len(newscr)
+        while y < len(oldscr):
+            self._move_relative(0, y)
+            self.__posxy = 0, y
+            self._erase_to_end()
+            y += 1
+
+        self._show_cursor()
+
+        self.screen = screen
+        self.move_cursor(cx, cy)
+
+    def __write_changed_line(
+        self, y: int, oldline: str, newline: str, px_coord: int
+    ) -> None:
+        # this is frustrating; there's no reason to test (say)
+        # self.dch1 inside the loop -- but alternative ways of
+        # structuring this function are equally painful (I'm trying to
+        # avoid writing code generators these days...)
+        minlen = min(wlen(oldline), wlen(newline))
+        x_pos = 0
+        x_coord = 0
+
+        px_pos = 0
+        j = 0
+        for c in oldline:
+            if j >= px_coord:
+                break
+            j += wlen(c)
+            px_pos += 1
+
+        # reuse the oldline as much as possible, but stop as soon as we
+        # encounter an ESCAPE, because it might be the start of an escape
+        # sequene
+        while (
+            x_coord < minlen
+            and oldline[x_pos] == newline[x_pos]
+            and newline[x_pos] != "\x1b"
+        ):
+            x_coord += wlen(newline[x_pos])
+            x_pos += 1
+
+        self._hide_cursor()
+        self._move_relative(x_coord, y)
+        if wlen(oldline) > wlen(newline):
+            self._erase_to_end()
+
+        self.__write(newline[x_pos:])
+        if wlen(newline) == self.width:
+            # If we wrapped we want to start at the next line
+            self._move_relative(0, y + 1)
+            self.__posxy = 0, y + 1
+        else:
+            self.__posxy = wlen(newline), y
+
+            if "\x1b" in newline or y != self.__posxy[1]:
+                # ANSI escape characters are present, so we can't assume
+                # anything about the position of the cursor.  Moving the cursor
+                # to the left margin should work to get to a known position.
+                self.move_cursor(0, y)
+
+    def _scroll(
+        self, top: int, bottom: int, left: int | None = None, right: int | 
None = None
+    ) -> None:
+        scroll_rect = SMALL_RECT()
+        scroll_rect.Top = SHORT(top)
+        scroll_rect.Bottom = SHORT(bottom)
+        scroll_rect.Left = SHORT(0 if left is None else left)
+        scroll_rect.Right = SHORT(
+            self.getheightwidth()[1] - 1 if right is None else right
+        )
+        destination_origin = _COORD()
+        fill_info = CHAR_INFO()
+        fill_info.UnicodeChar = " "
+
+        if not ScrollConsoleScreenBuffer(
+            OutHandle, scroll_rect, None, destination_origin, fill_info
+        ):
+            raise WinError(GetLastError())
+
+    def _hide_cursor(self):
+        self.__write("\x1b[?25l")
+
+    def _show_cursor(self):
+        self.__write("\x1b[?25h")
+
+    def _enable_blinking(self):
+        self.__write("\x1b[?12h")
+
+    def _disable_blinking(self):
+        self.__write("\x1b[?12l")
+
+    def __write(self, text: str) -> None:
+        if self.out is not None:
+            self.out.write(text.encode(self.encoding, "replace"))
+            self.out.flush()
+        else:
+            os.write(self.output_fd, text.encode(self.encoding, "replace"))
+
+    @property
+    def screen_xy(self) -> tuple[int, int]:
+        info = CONSOLE_SCREEN_BUFFER_INFO()
+        if not GetConsoleScreenBufferInfo(OutHandle, info):
+            raise WinError(GetLastError())
+        return info.dwCursorPosition.X, info.dwCursorPosition.Y
+
+    def _erase_to_end(self) -> None:
+        self.__write(ERASE_IN_LINE)
+
+    def prepare(self) -> None:
+        trace("prepare")
+        self.screen = []
+        self.height, self.width = self.getheightwidth()
+
+        self.__posxy = 0, 0
+        self.__gone_tall = 0
+        self.__offset = 0
+
+    def restore(self) -> None:
+        pass
+
+    def _move_relative(self, x: int, y: int) -> None:
+        """Moves relative to the current __posxy"""
+        dx = x - self.__posxy[0]
+        dy = y - self.__posxy[1]
+        if dx < 0:
+            self.__write(MOVE_LEFT.format(-dx))
+        elif dx > 0:
+            self.__write(MOVE_RIGHT.format(dx))
+
+        if dy < 0:
+            self.__write(MOVE_UP.format(-dy))
+        elif dy > 0:
+            self.__write(MOVE_DOWN.format(dy))
+
+    def move_cursor(self, x: int, y: int) -> None:
+        if x < 0 or y < 0:
+            raise ValueError(f"Bad cursor position {x}, {y}")
+
+        if y < self.__offset or y >= self.__offset + self.height:
+            self.event_queue.insert(0, Event("scroll", ""))
+        else:
+            self._move_relative(x, y)
+            self.__posxy = x, y
+
+    def set_cursor_vis(self, visible: bool) -> None:
+        if visible:
+            self._show_cursor()
+        else:
+            self._hide_cursor()
+
+    def getheightwidth(self) -> tuple[int, int]:
+        """Return (height, width) where height and width are the height
+        and width of the terminal window in characters."""
+        info = CONSOLE_SCREEN_BUFFER_INFO()
+        if not GetConsoleScreenBufferInfo(OutHandle, info):
+            raise WinError(GetLastError())
+        return (
+            info.srWindow.Bottom - info.srWindow.Top + 1,
+            info.srWindow.Right - info.srWindow.Left + 1,
+        )
+
+    def _getscrollbacksize(self) -> int:
+        info = CONSOLE_SCREEN_BUFFER_INFO()
+        if not GetConsoleScreenBufferInfo(OutHandle, info):
+            raise WinError(GetLastError())
+
+        return info.srWindow.Bottom  # type: ignore[no-any-return]
+
+    def _read_input(self) -> INPUT_RECORD | None:
+        rec = INPUT_RECORD()
+        read = DWORD()
+        if not ReadConsoleInput(InHandle, rec, 1, read):
+            raise WinError(GetLastError())
+
+        if read.value == 0:
+            return None
+
+        return rec
+
+    def get_event(self, block: bool = True) -> Event | None:
+        """Return an Event instance.  Returns None if |block| is false
+        and there is no event pending, otherwise waits for the
+        completion of an event."""
+        if self.event_queue:
+            return self.event_queue.pop()
+
+        while True:
+            rec = self._read_input()
+            if rec is None:
+                if block:
+                    continue
+                return None
+
+            if rec.EventType == WINDOW_BUFFER_SIZE_EVENT:
+                return Event("resize", "")
+
+            if rec.EventType != KEY_EVENT or not rec.Event.KeyEvent.bKeyDown:
+                # Only process keys and keydown events
+                if block:
+                    continue
+                return None
+
+            key = rec.Event.KeyEvent.uChar.UnicodeChar
+
+            if rec.Event.KeyEvent.uChar.UnicodeChar == "\r":
+                # Make enter make unix-like
+                return Event(evt="key", data="\n", raw=b"\n")
+            elif rec.Event.KeyEvent.wVirtualKeyCode == 8:
+                # Turn backspace directly into the command
+                return Event(
+                    evt="key",
+                    data="backspace",
+                    raw=rec.Event.KeyEvent.uChar.UnicodeChar,
+                )
+            elif rec.Event.KeyEvent.uChar.UnicodeChar == "\x00":
+                # Handle special keys like arrow keys and translate them into 
the appropriate command
+                code = VK_MAP.get(rec.Event.KeyEvent.wVirtualKeyCode)
+                if code:
+                    return Event(
+                        evt="key", data=code, 
raw=rec.Event.KeyEvent.uChar.UnicodeChar
+                    )
+                if block:
+                    continue
+
+                return None
+
+            return Event(evt="key", data=key, 
raw=rec.Event.KeyEvent.uChar.UnicodeChar)
+
+    def push_char(self, char: int | bytes) -> None:
+        """
+        Push a character to the console event queue.
+        """
+        raise NotImplementedError("push_char not supported on Windows")
+
+    def beep(self) -> None:
+        self.__write("\x07")
+
+    def clear(self) -> None:
+        """Wipe the screen"""
+        self.__write(CLEAR)
+        self.__posxy = 0, 0
+        self.screen = [""]
+
+    def finish(self) -> None:
+        """Move the cursor to the end of the display and otherwise get
+        ready for end.  XXX could be merged with restore?  Hmm."""
+        y = len(self.screen) - 1
+        while y >= 0 and not self.screen[y]:
+            y -= 1
+        self._move_relative(0, min(y, self.height + self.__offset - 1))
+        self.__write("\r\n")
+
+    def flushoutput(self) -> None:
+        """Flush all output to the screen (assuming there's some
+        buffering going on somewhere).
+
+        All output on Windows is unbuffered so this is a nop"""
+        pass
+
+    def forgetinput(self) -> None:
+        """Forget all pending, but not yet processed input."""
+        while self._read_input() is not None:
+            pass
+
+    def getpending(self) -> Event:
+        """Return the characters that have been typed but not yet
+        processed."""
+        return Event("key", "", b"")
+
+    def wait(self) -> None:
+        """Wait for an event."""
+        raise NotImplementedError("No wait support")
+
+    def repaint(self) -> None:
+        raise NotImplementedError("No repaint support")
+
+
+# Windows interop
+class CONSOLE_SCREEN_BUFFER_INFO(Structure):
+    _fields_ = [
+        ("dwSize", _COORD),
+        ("dwCursorPosition", _COORD),
+        ("wAttributes", WORD),
+        ("srWindow", SMALL_RECT),
+        ("dwMaximumWindowSize", _COORD),
+    ]
+
+
+class CONSOLE_CURSOR_INFO(Structure):
+    _fields_ = [
+        ("dwSize", DWORD),
+        ("bVisible", BOOL),
+    ]
+
+
+class CHAR_INFO(Structure):
+    _fields_ = [
+        ("UnicodeChar", WCHAR),
+        ("Attributes", WORD),
+    ]
+
+
+class Char(Union):
+    _fields_ = [
+        ("UnicodeChar", WCHAR),
+        ("Char", CHAR),
+    ]
+
+
+class KeyEvent(ctypes.Structure):
+    _fields_ = [
+        ("bKeyDown", BOOL),
+        ("wRepeatCount", WORD),
+        ("wVirtualKeyCode", WORD),
+        ("wVirtualScanCode", WORD),
+        ("uChar", Char),
+        ("dwControlKeyState", DWORD),
+    ]
+
+
+class WindowsBufferSizeEvent(ctypes.Structure):
+    _fields_ = [("dwSize", _COORD)]
+
+
+class ConsoleEvent(ctypes.Union):
+    _fields_ = [
+        ("KeyEvent", KeyEvent),
+        ("WindowsBufferSizeEvent", WindowsBufferSizeEvent),
+    ]
+
+
+class INPUT_RECORD(Structure):
+    _fields_ = [("EventType", WORD), ("Event", ConsoleEvent)]
+
+
+KEY_EVENT = 0x01
+FOCUS_EVENT = 0x10
+MENU_EVENT = 0x08
+MOUSE_EVENT = 0x02
+WINDOW_BUFFER_SIZE_EVENT = 0x04
+
+ENABLE_PROCESSED_OUTPUT = 0x01
+ENABLE_WRAP_AT_EOL_OUTPUT = 0x02
+ENABLE_VIRTUAL_TERMINAL_PROCESSING = 0x04
+
+STD_INPUT_HANDLE = -10
+STD_OUTPUT_HANDLE = -11
+
+if sys.platform == "win32":
+    _KERNEL32 = WinDLL("kernel32", use_last_error=True)
+
+    GetStdHandle = windll.kernel32.GetStdHandle
+    GetStdHandle.argtypes = [DWORD]
+    GetStdHandle.restype = HANDLE
+
+    GetConsoleScreenBufferInfo = _KERNEL32.GetConsoleScreenBufferInfo
+    GetConsoleScreenBufferInfo.argtypes = [
+        HANDLE,
+        ctypes.POINTER(CONSOLE_SCREEN_BUFFER_INFO),
+    ]
+    GetConsoleScreenBufferInfo.restype = BOOL
+
+    ScrollConsoleScreenBuffer = _KERNEL32.ScrollConsoleScreenBufferW
+    ScrollConsoleScreenBuffer.argtypes = [
+        HANDLE,
+        POINTER(SMALL_RECT),
+        POINTER(SMALL_RECT),
+        _COORD,
+        POINTER(CHAR_INFO),
+    ]
+    ScrollConsoleScreenBuffer.restype = BOOL
+
+    SetConsoleMode = _KERNEL32.SetConsoleMode
+    SetConsoleMode.argtypes = [HANDLE, DWORD]
+    SetConsoleMode.restype = BOOL
+
+    ReadConsoleInput = _KERNEL32.ReadConsoleInputW
+    ReadConsoleInput.argtypes = [HANDLE, POINTER(INPUT_RECORD), DWORD, 
POINTER(DWORD)]
+    ReadConsoleInput.restype = BOOL
+
+    OutHandle = GetStdHandle(STD_OUTPUT_HANDLE)
+    InHandle = GetStdHandle(STD_INPUT_HANDLE)
+else:
+
+    def _win_only(*args, **kwargs):
+        raise NotImplementedError("Windows only")
+
+    GetStdHandle = _win_only
+    GetConsoleScreenBufferInfo = _win_only
+    ScrollConsoleScreenBuffer = _win_only
+    SetConsoleMode = _win_only
+    ReadConsoleInput = _win_only
+    OutHandle = 0
+    InHandle = 0
diff --git a/Lib/test/test_pyrepl/__init__.py b/Lib/test/test_pyrepl/__init__.py
index fa38b86b847dd9..8359d9844623c2 100644
--- a/Lib/test/test_pyrepl/__init__.py
+++ b/Lib/test/test_pyrepl/__init__.py
@@ -1,12 +1,14 @@
 import os
+import sys
 from test.support import requires, load_package_tests
 from test.support.import_helper import import_module
 
-# Optionally test pyrepl.  This currently requires that the
-# 'curses' resource be given on the regrtest command line using the -u
-# option.  Additionally, we need to attempt to import curses and readline.
-requires("curses")
-curses = import_module("curses")
+if sys.platform != "win32":
+    # On non-Windows platforms, testing pyrepl currently requires that the
+    # 'curses' resource be given on the regrtest command line using the -u
+    # option.  Additionally, we need to attempt to import curses and readline.
+    requires("curses")
+    curses = import_module("curses")
 
 
 def load_tests(*args):
diff --git a/Lib/test/test_pyrepl/support.py b/Lib/test/test_pyrepl/support.py
index 75539049d43c2a..d2f5429aea7a11 100644
--- a/Lib/test/test_pyrepl/support.py
+++ b/Lib/test/test_pyrepl/support.py
@@ -55,7 +55,7 @@ def get_prompt(lineno, cursor_on_line) -> str:
     return reader
 
 
-def prepare_console(events: Iterable[Event], **kwargs):
+def prepare_console(events: Iterable[Event], **kwargs) -> MagicMock | Console:
     console = MagicMock()
     console.get_event.side_effect = events
     console.height = 100
diff --git a/Lib/test/test_pyrepl/test_pyrepl.py 
b/Lib/test/test_pyrepl/test_pyrepl.py
index 910e71d6246ac3..45114e7315749f 100644
--- a/Lib/test/test_pyrepl/test_pyrepl.py
+++ b/Lib/test/test_pyrepl/test_pyrepl.py
@@ -587,14 +587,15 @@ def prepare_reader(self, events, namespace):
         reader = ReadlineAlikeReader(console=console, config=config)
         return reader
 
+    @patch("rlcompleter._readline_available", False)
     def test_simple_completion(self):
-        events = code_to_events("os.geten\t\n")
+        events = code_to_events("os.getpid\t\n")
 
         namespace = {"os": os}
         reader = self.prepare_reader(events, namespace)
 
         output = multiline_input(reader, namespace)
-        self.assertEqual(output, "os.getenv")
+        self.assertEqual(output, "os.getpid()")
 
     def test_completion_with_many_options(self):
         # Test with something that initially displays many options
diff --git a/Lib/test/test_pyrepl/test_unix_console.py 
b/Lib/test/test_pyrepl/test_unix_console.py
index e1faa00caafc27..d0b98f17ade094 100644
--- a/Lib/test/test_pyrepl/test_unix_console.py
+++ b/Lib/test/test_pyrepl/test_unix_console.py
@@ -1,12 +1,16 @@
 import itertools
+import sys
+import unittest
 from functools import partial
 from unittest import TestCase
 from unittest.mock import MagicMock, call, patch, ANY
 
 from .support import handle_all_events, code_to_events
-from _pyrepl.console import Event
-from _pyrepl.unix_console import UnixConsole
-
+try:
+    from _pyrepl.console import Event
+    from _pyrepl.unix_console import UnixConsole
+except ImportError:
+    pass
 
 def unix_console(events, **kwargs):
     console = UnixConsole()
@@ -67,6 +71,7 @@ def unix_console(events, **kwargs):
 }
 
 
+@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
 @patch("_pyrepl.curses.tigetstr", lambda s: TERM_CAPABILITIES.get(s))
 @patch(
     "_pyrepl.curses.tparm",
diff --git a/Lib/test/test_pyrepl/test_unix_eventqueue.py 
b/Lib/test/test_pyrepl/test_unix_eventqueue.py
index c06536b4a86a04..301f79927a741f 100644
--- a/Lib/test/test_pyrepl/test_unix_eventqueue.py
+++ b/Lib/test/test_pyrepl/test_unix_eventqueue.py
@@ -1,11 +1,15 @@
 import tempfile
 import unittest
+import sys
 from unittest.mock import patch
 
-from _pyrepl.console import Event
-from _pyrepl.unix_eventqueue import EventQueue
-
+try:
+    from _pyrepl.console import Event
+    from _pyrepl.unix_eventqueue import EventQueue
+except ImportError:
+    pass
 
+@unittest.skipIf(sys.platform == "win32", "No Unix event queue on Windows")
 @patch("_pyrepl.curses.tigetstr", lambda x: b"")
 class TestUnixEventQueue(unittest.TestCase):
     def setUp(self):
diff --git a/Lib/test/test_pyrepl/test_windows_console.py 
b/Lib/test/test_pyrepl/test_windows_console.py
new file mode 100644
index 00000000000000..0b9014279a6f6d
--- /dev/null
+++ b/Lib/test/test_pyrepl/test_windows_console.py
@@ -0,0 +1,330 @@
+import itertools
+import sys
+import unittest
+from functools import partial
+from typing import Iterable
+from unittest import TestCase
+from unittest.mock import MagicMock, call
+
+from .support import handle_all_events, code_to_events
+
+try:
+    from _pyrepl.console import Event, Console
+    from _pyrepl.windows_console import (
+        WindowsConsole,
+        MOVE_LEFT,
+        MOVE_RIGHT,
+        MOVE_UP,
+        MOVE_DOWN,
+        ERASE_IN_LINE,
+    )
+except ImportError:
+    pass
+
+
+@unittest.skipIf(sys.platform != "win32", "Test class specifically for 
Windows")
+class WindowsConsoleTests(TestCase):
+    def console(self, events, **kwargs) -> Console:
+        console = WindowsConsole()
+        console.get_event = MagicMock(side_effect=events)
+        console._scroll = MagicMock()
+        console._hide_cursor = MagicMock()
+        console._show_cursor = MagicMock()
+        console._getscrollbacksize = MagicMock(42)
+        console.out = MagicMock()
+
+        height = kwargs.get("height", 25)
+        width = kwargs.get("width", 80)
+        console.getheightwidth = MagicMock(side_effect=lambda: (height, width))
+
+        console.prepare()
+        for key, val in kwargs.items():
+            setattr(console, key, val)
+        return console
+
+    def handle_events(self, events: Iterable[Event], **kwargs):
+        return handle_all_events(events, partial(self.console, **kwargs))
+
+    def handle_events_narrow(self, events):
+        return self.handle_events(events, width=5)
+
+    def handle_events_short(self, events):
+        return self.handle_events(events, height=1)
+
+    def handle_events_height_3(self, events):
+        return self.handle_events(events, height=3)
+
+    def test_simple_addition(self):
+        code = "12+34"
+        events = code_to_events(code)
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(b"1")
+        con.out.write.assert_any_call(b"2")
+        con.out.write.assert_any_call(b"+")
+        con.out.write.assert_any_call(b"3")
+        con.out.write.assert_any_call(b"4")
+        con.restore()
+
+    def test_wrap(self):
+        code = "12+34"
+        events = code_to_events(code)
+        _, con = self.handle_events_narrow(events)
+        con.out.write.assert_any_call(b"1")
+        con.out.write.assert_any_call(b"2")
+        con.out.write.assert_any_call(b"+")
+        con.out.write.assert_any_call(b"3")
+        con.out.write.assert_any_call(b"\\")
+        con.out.write.assert_any_call(b"\n")
+        con.out.write.assert_any_call(b"4")
+        con.restore()
+
+    def test_resize_wider(self):
+        code = "1234567890"
+        events = code_to_events(code)
+        reader, console = self.handle_events_narrow(events)
+
+        console.height = 20
+        console.width = 80
+        console.getheightwidth = MagicMock(lambda _: (20, 80))
+
+        def same_reader(_):
+            return reader
+
+        def same_console(events):
+            console.get_event = MagicMock(side_effect=events)
+            return console
+
+        _, con = handle_all_events(
+            [Event(evt="resize", data=None)],
+            prepare_reader=same_reader,
+            prepare_console=same_console,
+        )
+
+        con.out.write.assert_any_call(self.move_right(2))
+        con.out.write.assert_any_call(self.move_up(2))
+        con.out.write.assert_any_call(b"567890")
+
+        con.restore()
+
+    def test_resize_narrower(self):
+        code = "1234567890"
+        events = code_to_events(code)
+        reader, console = self.handle_events(events)
+
+        console.height = 20
+        console.width = 4
+        console.getheightwidth = MagicMock(lambda _: (20, 4))
+
+        def same_reader(_):
+            return reader
+
+        def same_console(events):
+            console.get_event = MagicMock(side_effect=events)
+            return console
+
+        _, con = handle_all_events(
+            [Event(evt="resize", data=None)],
+            prepare_reader=same_reader,
+            prepare_console=same_console,
+        )
+
+        con.out.write.assert_any_call(b"456\\")
+        con.out.write.assert_any_call(b"789\\")
+
+        con.restore()
+
+    def test_cursor_left(self):
+        code = "1"
+        events = itertools.chain(
+            code_to_events(code),
+            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
+        )
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(self.move_left())
+        con.restore()
+
+    def test_cursor_left_right(self):
+        code = "1"
+        events = itertools.chain(
+            code_to_events(code),
+            [
+                Event(evt="key", data="left", raw=bytearray(b"\x1bOD")),
+                Event(evt="key", data="right", raw=bytearray(b"\x1bOC")),
+            ],
+        )
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(self.move_left())
+        con.out.write.assert_any_call(self.move_right())
+        con.restore()
+
+    def test_cursor_up(self):
+        code = "1\n2+3"
+        events = itertools.chain(
+            code_to_events(code),
+            [Event(evt="key", data="up", raw=bytearray(b"\x1bOA"))],
+        )
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(self.move_up())
+        con.restore()
+
+    def test_cursor_up_down(self):
+        code = "1\n2+3"
+        events = itertools.chain(
+            code_to_events(code),
+            [
+                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+            ],
+        )
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(self.move_up())
+        con.out.write.assert_any_call(self.move_down())
+        con.restore()
+
+    def test_cursor_back_write(self):
+        events = itertools.chain(
+            code_to_events("1"),
+            [Event(evt="key", data="left", raw=bytearray(b"\x1bOD"))],
+            code_to_events("2"),
+        )
+        _, con = self.handle_events(events)
+        con.out.write.assert_any_call(b"1")
+        con.out.write.assert_any_call(self.move_left())
+        con.out.write.assert_any_call(b"21")
+        con.restore()
+
+    def test_multiline_function_move_up_short_terminal(self):
+        # fmt: off
+        code = (
+            "def f():\n"
+            "  foo"
+        )
+        # fmt: on
+
+        events = itertools.chain(
+            code_to_events(code),
+            [
+                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+                Event(evt="scroll", data=None),
+            ],
+        )
+        _, con = self.handle_events_short(events)
+        con.out.write.assert_any_call(self.move_left(5))
+        con.out.write.assert_any_call(self.move_up())
+        con.restore()
+
+    def test_multiline_function_move_up_down_short_terminal(self):
+        # fmt: off
+        code = (
+            "def f():\n"
+            "  foo"
+        )
+        # fmt: on
+
+        events = itertools.chain(
+            code_to_events(code),
+            [
+                Event(evt="key", data="up", raw=bytearray(b"\x1bOA")),
+                Event(evt="scroll", data=None),
+                Event(evt="key", data="down", raw=bytearray(b"\x1bOB")),
+                Event(evt="scroll", data=None),
+            ],
+        )
+        _, con = self.handle_events_short(events)
+        con.out.write.assert_any_call(self.move_left(8))
+        con.out.write.assert_any_call(self.erase_in_line())
+        con.restore()
+
+    def test_resize_bigger_on_multiline_function(self):
+        # fmt: off
+        code = (
+            "def f():\n"
+            "  foo"
+        )
+        # fmt: on
+
+        events = itertools.chain(code_to_events(code))
+        reader, console = self.handle_events_short(events)
+
+        console.height = 2
+        console.getheightwidth = MagicMock(lambda _: (2, 80))
+
+        def same_reader(_):
+            return reader
+
+        def same_console(events):
+            console.get_event = MagicMock(side_effect=events)
+            return console
+
+        _, con = handle_all_events(
+            [Event(evt="resize", data=None)],
+            prepare_reader=same_reader,
+            prepare_console=same_console,
+        )
+        con.out.write.assert_has_calls(
+            [
+                call(self.move_left(5)),
+                call(self.move_up()),
+                call(b"def f():"),
+                call(self.move_left(3)),
+                call(self.move_down()),
+            ]
+        )
+        console.restore()
+        con.restore()
+
+    def test_resize_smaller_on_multiline_function(self):
+        # fmt: off
+        code = (
+            "def f():\n"
+            "  foo"
+        )
+        # fmt: on
+
+        events = itertools.chain(code_to_events(code))
+        reader, console = self.handle_events_height_3(events)
+
+        console.height = 1
+        console.getheightwidth = MagicMock(lambda _: (1, 80))
+
+        def same_reader(_):
+            return reader
+
+        def same_console(events):
+            console.get_event = MagicMock(side_effect=events)
+            return console
+
+        _, con = handle_all_events(
+            [Event(evt="resize", data=None)],
+            prepare_reader=same_reader,
+            prepare_console=same_console,
+        )
+        con.out.write.assert_has_calls(
+            [
+                call(self.move_left(5)),
+                call(self.move_up()),
+                call(self.erase_in_line()),
+                call(b"  foo"),
+            ]
+        )
+        console.restore()
+        con.restore()
+
+    def move_up(self, lines=1):
+        return MOVE_UP.format(lines).encode("utf8")
+
+    def move_down(self, lines=1):
+        return MOVE_DOWN.format(lines).encode("utf8")
+
+    def move_left(self, cols=1):
+        return MOVE_LEFT.format(cols).encode("utf8")
+
+    def move_right(self, cols=1):
+        return MOVE_RIGHT.format(cols).encode("utf8")
+
+    def erase_in_line(self):
+        return ERASE_IN_LINE.encode("utf8")
+
+
+if __name__ == "__main__":
+    unittest.main()
diff --git 
a/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst 
b/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst
new file mode 100644
index 00000000000000..f3918ed633d78c
--- /dev/null
+++ b/Misc/NEWS.d/next/Windows/2024-05-25-18-43-10.gh-issue-111201.SLPJIx.rst
@@ -0,0 +1 @@
+Add support for new pyrepl on Windows

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to