https://github.com/python/cpython/commit/e80abd57a82ea1beae0a82423d45c6eb8c5c5c74
commit: e80abd57a82ea1beae0a82423d45c6eb8c5c5c74
branch: main
author: Eric Snow <ericsnowcurren...@gmail.com>
committer: ericsnowcurrently <ericsnowcurren...@gmail.com>
date: 2024-02-28T16:08:08-07:00
summary:

gh-76785: Update test.support.interpreters to Align With PEP 734 (gh-115566)

This brings the code under test.support.interpreters, and the corresponding 
extension modules, in line with recent updates to PEP 734.

(Note: PEP 734 has not been accepted at this time.  However, we are using an 
internal copy of the implementation in the test suite to exercise the existing 
subinterpreters feature.)

files:
M Lib/test/support/interpreters/__init__.py
M Lib/test/support/interpreters/queues.py
M Lib/test/test_interpreters/test_api.py
M Lib/test/test_interpreters/test_channels.py
M Lib/test/test_interpreters/test_lifecycle.py
M Lib/test/test_interpreters/test_queues.py
M Lib/test/test_interpreters/utils.py
M Lib/test/test_sys.py
M Lib/test/test_threading.py
M Modules/_xxinterpqueuesmodule.c
M Modules/_xxsubinterpretersmodule.c

diff --git a/Lib/test/support/interpreters/__init__.py 
b/Lib/test/support/interpreters/__init__.py
index 15a908e9663593..d02ffbae1113c0 100644
--- a/Lib/test/support/interpreters/__init__.py
+++ b/Lib/test/support/interpreters/__init__.py
@@ -6,7 +6,7 @@
 
 # aliases:
 from _xxsubinterpreters import (
-    InterpreterError, InterpreterNotFoundError,
+    InterpreterError, InterpreterNotFoundError, NotShareableError,
     is_shareable,
 )
 
@@ -14,7 +14,8 @@
 __all__ = [
     'get_current', 'get_main', 'create', 'list_all', 'is_shareable',
     'Interpreter',
-    'InterpreterError', 'InterpreterNotFoundError', 'ExecFailure',
+    'InterpreterError', 'InterpreterNotFoundError', 'ExecutionFailed',
+    'NotShareableError',
     'create_queue', 'Queue', 'QueueEmpty', 'QueueFull',
 ]
 
@@ -42,7 +43,11 @@ def __getattr__(name):
 {formatted}
 """.strip()
 
-class ExecFailure(RuntimeError):
+class ExecutionFailed(RuntimeError):
+    """An unhandled exception happened during execution.
+
+    This is raised from Interpreter.exec() and Interpreter.call().
+    """
 
     def __init__(self, excinfo):
         msg = excinfo.formatted
@@ -157,7 +162,7 @@ def prepare_main(self, ns=None, /, **kwargs):
         ns = dict(ns, **kwargs) if ns is not None else kwargs
         _interpreters.set___main___attrs(self._id, ns)
 
-    def exec_sync(self, code, /):
+    def exec(self, code, /):
         """Run the given source code in the interpreter.
 
         This is essentially the same as calling the builtin "exec"
@@ -166,10 +171,10 @@ def exec_sync(self, code, /):
 
         There is no return value.
 
-        If the code raises an unhandled exception then an ExecFailure
-        is raised, which summarizes the unhandled exception.  The actual
-        exception is discarded because objects cannot be shared between
-        interpreters.
+        If the code raises an unhandled exception then an ExecutionFailed
+        exception is raised, which summarizes the unhandled exception.
+        The actual exception is discarded because objects cannot be
+        shared between interpreters.
 
         This blocks the current Python thread until done.  During
         that time, the previous interpreter is allowed to run
@@ -177,11 +182,35 @@ def exec_sync(self, code, /):
         """
         excinfo = _interpreters.exec(self._id, code)
         if excinfo is not None:
-            raise ExecFailure(excinfo)
+            raise ExecutionFailed(excinfo)
+
+    def call(self, callable, /):
+        """Call the object in the interpreter with given args/kwargs.
+
+        Only functions that take no arguments and have no closure
+        are supported.
 
-    def run(self, code, /):
+        The return value is discarded.
+
+        If the callable raises an exception then the error display
+        (including full traceback) is send back between the interpreters
+        and an ExecutionFailed exception is raised, much like what
+        happens with Interpreter.exec().
+        """
+        # XXX Support args and kwargs.
+        # XXX Support arbitrary callables.
+        # XXX Support returning the return value (e.g. via pickle).
+        excinfo = _interpreters.call(self._id, callable)
+        if excinfo is not None:
+            raise ExecutionFailed(excinfo)
+
+    def call_in_thread(self, callable, /):
+        """Return a new thread that calls the object in the interpreter.
+
+        The return value and any raised exception are discarded.
+        """
         def task():
-            self.exec_sync(code)
+            self.call(callable)
         t = threading.Thread(target=task)
         t.start()
         return t
diff --git a/Lib/test/support/interpreters/queues.py 
b/Lib/test/support/interpreters/queues.py
index aead0c40ca9667..2cc616be337a50 100644
--- a/Lib/test/support/interpreters/queues.py
+++ b/Lib/test/support/interpreters/queues.py
@@ -1,5 +1,6 @@
 """Cross-interpreter Queues High Level Module."""
 
+import pickle
 import queue
 import time
 import weakref
@@ -31,20 +32,26 @@ class QueueFull(_queues.QueueFull, queue.Full):
     """
 
 
-def create(maxsize=0):
+_SHARED_ONLY = 0
+_PICKLED = 1
+
+def create(maxsize=0, *, syncobj=False):
     """Return a new cross-interpreter queue.
 
     The queue may be used to pass data safely between interpreters.
+
+    "syncobj" sets the default for Queue.put()
+    and Queue.put_nowait().
     """
-    qid = _queues.create(maxsize)
-    return Queue(qid)
+    fmt = _SHARED_ONLY if syncobj else _PICKLED
+    qid = _queues.create(maxsize, fmt)
+    return Queue(qid, _fmt=fmt)
 
 
 def list_all():
     """Return a list of all open queues."""
-    return [Queue(qid)
-            for qid in _queues.list_all()]
-
+    return [Queue(qid, _fmt=fmt)
+            for qid, fmt in _queues.list_all()]
 
 
 _known_queues = weakref.WeakValueDictionary()
@@ -52,17 +59,20 @@ def list_all():
 class Queue:
     """A cross-interpreter queue."""
 
-    def __new__(cls, id, /):
+    def __new__(cls, id, /, *, _fmt=None):
         # There is only one instance for any given ID.
         if isinstance(id, int):
             id = int(id)
         else:
             raise TypeError(f'id must be an int, got {id!r}')
+        if _fmt is None:
+            _fmt = _queues.get_default_fmt(id)
         try:
             self = _known_queues[id]
         except KeyError:
             self = super().__new__(cls)
             self._id = id
+            self._fmt = _fmt
             _known_queues[id] = self
             _queues.bind(id)
         return self
@@ -105,20 +115,50 @@ def qsize(self):
         return _queues.get_count(self._id)
 
     def put(self, obj, timeout=None, *,
+            syncobj=None,
             _delay=10 / 1000,  # 10 milliseconds
             ):
         """Add the object to the queue.
 
         This blocks while the queue is full.
+
+        If "syncobj" is None (the default) then it uses the
+        queue's default, set with create_queue()..
+
+        If "syncobj" is false then all objects are supported,
+        at the expense of worse performance.
+
+        If "syncobj" is true then the object must be "shareable".
+        Examples of "shareable" objects include the builtin singletons,
+        str, and memoryview.  One benefit is that such objects are
+        passed through the queue efficiently.
+
+        The key difference, though, is conceptual: the corresponding
+        object returned from Queue.get() will be strictly equivalent
+        to the given obj.  In other words, the two objects will be
+        effectively indistinguishable from each other, even if the
+        object is mutable.  The received object may actually be the
+        same object, or a copy (immutable values only), or a proxy.
+        Regardless, the received object should be treated as though
+        the original has been shared directly, whether or not it
+        actually is.  That's a slightly different and stronger promise
+        than just (initial) equality, which is all "syncobj=False"
+        can promise.
         """
+        if syncobj is None:
+            fmt = self._fmt
+        else:
+            fmt = _SHARED_ONLY if syncobj else _PICKLED
         if timeout is not None:
             timeout = int(timeout)
             if timeout < 0:
                 raise ValueError(f'timeout value must be non-negative')
             end = time.time() + timeout
+        if fmt is _PICKLED:
+            obj = pickle.dumps(obj)
         while True:
             try:
-                _queues.put(self._id, obj)
+                _queues.put(self._id, obj, fmt)
             except _queues.QueueFull as exc:
                 if timeout is not None and time.time() >= end:
                     exc.__class__ = QueueFull
@@ -127,9 +167,15 @@ def put(self, obj, timeout=None, *,
             else:
                 break
 
-    def put_nowait(self, obj):
+    def put_nowait(self, obj, *, syncobj=None):
+        if syncobj is None:
+            fmt = self._fmt
+        else:
+            fmt = _SHARED_ONLY if syncobj else _PICKLED
+        if fmt is _PICKLED:
+            obj = pickle.dumps(obj)
         try:
-            return _queues.put(self._id, obj)
+            _queues.put(self._id, obj, fmt)
         except _queues.QueueFull as exc:
             exc.__class__ = QueueFull
             raise  # re-raise
@@ -148,12 +194,18 @@ def get(self, timeout=None, *,
             end = time.time() + timeout
         while True:
             try:
-                return _queues.get(self._id)
+                obj, fmt = _queues.get(self._id)
             except _queues.QueueEmpty as exc:
                 if timeout is not None and time.time() >= end:
                     exc.__class__ = QueueEmpty
                     raise  # re-raise
                 time.sleep(_delay)
+            else:
+                break
+        if fmt == _PICKLED:
+            obj = pickle.loads(obj)
+        else:
+            assert fmt == _SHARED_ONLY
         return obj
 
     def get_nowait(self):
diff --git a/Lib/test/test_interpreters/test_api.py 
b/Lib/test/test_interpreters/test_api.py
index aefd326977095f..363143fa810f35 100644
--- a/Lib/test/test_interpreters/test_api.py
+++ b/Lib/test/test_interpreters/test_api.py
@@ -280,7 +280,7 @@ def test_subinterpreter(self):
     def test_finished(self):
         r, w = self.pipe()
         interp = interpreters.create()
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             import os
             os.write({w}, b'x')
             """)
@@ -312,7 +312,7 @@ def test_with_only_background_threads(self):
         FINISHED = b'F'
 
         interp = interpreters.create()
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             import os
             import threading
 
@@ -326,7 +326,7 @@ def task():
         self.assertFalse(interp.is_running())
 
         os.write(w_thread, DONE)
-        interp.exec_sync('t.join()')
+        interp.exec('t.join()')
         self.assertEqual(os.read(r_interp, 1), FINISHED)
 
 
@@ -393,7 +393,7 @@ def test_from_sibling(self):
         interp2 = interpreters.create()
         self.assertEqual(set(interpreters.list_all()),
                          {main, interp1, interp2})
-        interp1.exec_sync(dedent(f"""
+        interp1.exec(dedent(f"""
             from test.support import interpreters
             interp2 = interpreters.Interpreter({interp2.id})
             interp2.close()
@@ -427,7 +427,7 @@ def test_subthreads_still_running(self):
         FINISHED = b'F'
 
         interp = interpreters.create()
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             import os
             import threading
             import time
@@ -503,27 +503,27 @@ def test_not_shareable(self):
             interp.prepare_main(spam={'spam': 'eggs', 'foo': 'bar'})
 
         # Make sure neither was actually bound.
-        with self.assertRaises(interpreters.ExecFailure):
-            interp.exec_sync('print(foo)')
-        with self.assertRaises(interpreters.ExecFailure):
-            interp.exec_sync('print(spam)')
+        with self.assertRaises(interpreters.ExecutionFailed):
+            interp.exec('print(foo)')
+        with self.assertRaises(interpreters.ExecutionFailed):
+            interp.exec('print(spam)')
 
 
-class TestInterpreterExecSync(TestBase):
+class TestInterpreterExec(TestBase):
 
     def test_success(self):
         interp = interpreters.create()
         script, file = _captured_script('print("it worked!", end="")')
         with file:
-            interp.exec_sync(script)
+            interp.exec(script)
             out = file.read()
 
         self.assertEqual(out, 'it worked!')
 
     def test_failure(self):
         interp = interpreters.create()
-        with self.assertRaises(interpreters.ExecFailure):
-            interp.exec_sync('raise Exception')
+        with self.assertRaises(interpreters.ExecutionFailed):
+            interp.exec('raise Exception')
 
     def test_display_preserved_exception(self):
         tempdir = self.temp_dir()
@@ -542,21 +542,21 @@ def script():
                 spam.eggs()
 
             interp = interpreters.create()
-            interp.exec_sync(script)
+            interp.exec(script)
             """)
 
         stdout, stderr = self.assert_python_failure(scriptfile)
         self.maxDiff = None
-        interpmod_line, = (l for l in stderr.splitlines() if ' exec_sync' in l)
-        #      File "{interpreters.__file__}", line 179, in exec_sync
+        interpmod_line, = (l for l in stderr.splitlines() if ' exec' in l)
+        #      File "{interpreters.__file__}", line 179, in exec
         self.assertEqual(stderr, dedent(f"""\
             Traceback (most recent call last):
               File "{scriptfile}", line 9, in <module>
-                interp.exec_sync(script)
-                ~~~~~~~~~~~~~~~~^^^^^^^^
+                interp.exec(script)
+                ~~~~~~~~~~~^^^^^^^^
               {interpmod_line.strip()}
-                raise ExecFailure(excinfo)
-            test.support.interpreters.ExecFailure: RuntimeError: uh-oh!
+                raise ExecutionFailed(excinfo)
+            test.support.interpreters.ExecutionFailed: RuntimeError: uh-oh!
 
             Uncaught in the interpreter:
 
@@ -578,7 +578,7 @@ def test_in_thread(self):
         script, file = _captured_script('print("it worked!", end="")')
         with file:
             def f():
-                interp.exec_sync(script)
+                interp.exec(script)
 
             t = threading.Thread(target=f)
             t.start()
@@ -604,7 +604,7 @@ def test_fork(self):
                     with open('{file.name}', 'w', encoding='utf-8') as out:
                         out.write('{expected}')
                 """)
-            interp.exec_sync(script)
+            interp.exec(script)
 
             file.seek(0)
             content = file.read()
@@ -615,17 +615,17 @@ def test_already_running(self):
         interp = interpreters.create()
         with _running(interp):
             with self.assertRaises(RuntimeError):
-                interp.exec_sync('print("spam")')
+                interp.exec('print("spam")')
 
     def test_bad_script(self):
         interp = interpreters.create()
         with self.assertRaises(TypeError):
-            interp.exec_sync(10)
+            interp.exec(10)
 
     def test_bytes_for_script(self):
         interp = interpreters.create()
         with self.assertRaises(TypeError):
-            interp.exec_sync(b'print("spam")')
+            interp.exec(b'print("spam")')
 
     def test_with_background_threads_still_running(self):
         r_interp, w_interp = self.pipe()
@@ -636,7 +636,7 @@ def test_with_background_threads_still_running(self):
         FINISHED = b'F'
 
         interp = interpreters.create()
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             import os
             import threading
 
@@ -648,46 +648,229 @@ def task():
             t.start()
             os.write({w_interp}, {RAN!r})
             """)
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             os.write({w_interp}, {RAN!r})
             """)
 
         os.write(w_thread, DONE)
-        interp.exec_sync('t.join()')
+        interp.exec('t.join()')
         self.assertEqual(os.read(r_interp, 1), RAN)
         self.assertEqual(os.read(r_interp, 1), RAN)
         self.assertEqual(os.read(r_interp, 1), FINISHED)
 
     # test_xxsubinterpreters covers the remaining
-    # Interpreter.exec_sync() behavior.
+    # Interpreter.exec() behavior.
 
 
-class TestInterpreterRun(TestBase):
-
-    def test_success(self):
-        interp = interpreters.create()
-        script, file = _captured_script('print("it worked!", end="")')
-        with file:
-            t = interp.run(script)
+def call_func_noop():
+    pass
+
+
+def call_func_return_shareable():
+    return (1, None)
+
+
+def call_func_return_not_shareable():
+    return [1, 2, 3]
+
+
+def call_func_failure():
+    raise Exception('spam!')
+
+
+def call_func_ident(value):
+    return value
+
+
+def get_call_func_closure(value):
+    def call_func_closure():
+        return value
+    return call_func_closure
+
+
+class Spam:
+
+    @staticmethod
+    def noop():
+        pass
+
+    @classmethod
+    def from_values(cls, *values):
+        return cls(values)
+
+    def __init__(self, value):
+        self.value = value
+
+    def __call__(self, *args, **kwargs):
+        return (self.value, args, kwargs)
+
+    def __eq__(self, other):
+        if not isinstance(other, Spam):
+            return NotImplemented
+        return self.value == other.value
+
+    def run(self, *args, **kwargs):
+        return (self.value, args, kwargs)
+
+
+def call_func_complex(op, /, value=None, *args, exc=None, **kwargs):
+    if exc is not None:
+        raise exc
+    if op == '':
+        raise ValueError('missing op')
+    elif op == 'ident':
+        if args or kwargs:
+            raise Exception((args, kwargs))
+        return value
+    elif op == 'full-ident':
+        return (value, args, kwargs)
+    elif op == 'globals':
+        if value is not None or args or kwargs:
+            raise Exception((value, args, kwargs))
+        return __name__
+    elif op == 'interpid':
+        if value is not None or args or kwargs:
+            raise Exception((value, args, kwargs))
+        return interpreters.get_current().id
+    elif op == 'closure':
+        if args or kwargs:
+            raise Exception((args, kwargs))
+        return get_call_func_closure(value)
+    elif op == 'custom':
+        if args or kwargs:
+            raise Exception((args, kwargs))
+        return Spam(value)
+    elif op == 'custom-inner':
+        if args or kwargs:
+            raise Exception((args, kwargs))
+        class Eggs(Spam):
+            pass
+        return Eggs(value)
+    elif not isinstance(op, str):
+        raise TypeError(op)
+    else:
+        raise NotImplementedError(op)
+
+
+class TestInterpreterCall(TestBase):
+
+    # signature
+    #  - blank
+    #  - args
+    #  - kwargs
+    #  - args, kwargs
+    # return
+    #  - nothing (None)
+    #  - simple
+    #  - closure
+    #  - custom
+    # ops:
+    #  - do nothing
+    #  - fail
+    #  - echo
+    #  - do complex, relative to interpreter
+    # scope
+    #  - global func
+    #  - local closure
+    #  - returned closure
+    #  - callable type instance
+    #  - type
+    #  - classmethod
+    #  - staticmethod
+    #  - instance method
+    # exception
+    #  - builtin
+    #  - custom
+    #  - preserves info (e.g. SyntaxError)
+    #  - matching error display
+
+    def test_call(self):
+        interp = interpreters.create()
+
+        for i, (callable, args, kwargs) in enumerate([
+            (call_func_noop, (), {}),
+            (call_func_return_shareable, (), {}),
+            (call_func_return_not_shareable, (), {}),
+            (Spam.noop, (), {}),
+        ]):
+            with self.subTest(f'success case #{i+1}'):
+                res = interp.call(callable)
+                self.assertIs(res, None)
+
+        for i, (callable, args, kwargs) in enumerate([
+            (call_func_ident, ('spamspamspam',), {}),
+            (get_call_func_closure, (42,), {}),
+            (get_call_func_closure(42), (), {}),
+            (Spam.from_values, (), {}),
+            (Spam.from_values, (1, 2, 3), {}),
+            (Spam, ('???'), {}),
+            (Spam(101), (), {}),
+            (Spam(10101).run, (), {}),
+            (call_func_complex, ('ident', 'spam'), {}),
+            (call_func_complex, ('full-ident', 'spam'), {}),
+            (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': 
'!!!'}),
+            (call_func_complex, ('globals',), {}),
+            (call_func_complex, ('interpid',), {}),
+            (call_func_complex, ('closure',), {'value': '~~~'}),
+            (call_func_complex, ('custom', 'spam!'), {}),
+            (call_func_complex, ('custom-inner', 'eggs!'), {}),
+            (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+        ]):
+            with self.subTest(f'invalid case #{i+1}'):
+                with self.assertRaises(Exception):
+                    if args or kwargs:
+                        raise Exception((args, kwargs))
+                    interp.call(callable)
+
+        with self.assertRaises(interpreters.ExecutionFailed):
+            interp.call(call_func_failure)
+
+    def test_call_in_thread(self):
+        interp = interpreters.create()
+
+        for i, (callable, args, kwargs) in enumerate([
+            (call_func_noop, (), {}),
+            (call_func_return_shareable, (), {}),
+            (call_func_return_not_shareable, (), {}),
+            (Spam.noop, (), {}),
+        ]):
+            with self.subTest(f'success case #{i+1}'):
+                with self.captured_thread_exception() as ctx:
+                    t = interp.call_in_thread(callable)
+                    t.join()
+                self.assertIsNone(ctx.caught)
+
+        for i, (callable, args, kwargs) in enumerate([
+            (call_func_ident, ('spamspamspam',), {}),
+            (get_call_func_closure, (42,), {}),
+            (get_call_func_closure(42), (), {}),
+            (Spam.from_values, (), {}),
+            (Spam.from_values, (1, 2, 3), {}),
+            (Spam, ('???'), {}),
+            (Spam(101), (), {}),
+            (Spam(10101).run, (), {}),
+            (call_func_complex, ('ident', 'spam'), {}),
+            (call_func_complex, ('full-ident', 'spam'), {}),
+            (call_func_complex, ('full-ident', 'spam', 'ham'), {'eggs': 
'!!!'}),
+            (call_func_complex, ('globals',), {}),
+            (call_func_complex, ('interpid',), {}),
+            (call_func_complex, ('closure',), {'value': '~~~'}),
+            (call_func_complex, ('custom', 'spam!'), {}),
+            (call_func_complex, ('custom-inner', 'eggs!'), {}),
+            (call_func_complex, ('???',), {'exc': ValueError('spam')}),
+        ]):
+            with self.subTest(f'invalid case #{i+1}'):
+                if args or kwargs:
+                    continue
+                with self.captured_thread_exception() as ctx:
+                    t = interp.call_in_thread(callable)
+                    t.join()
+                self.assertIsNotNone(ctx.caught)
+
+        with self.captured_thread_exception() as ctx:
+            t = interp.call_in_thread(call_func_failure)
             t.join()
-            out = file.read()
-
-        self.assertEqual(out, 'it worked!')
-
-    def test_failure(self):
-        caught = False
-        def excepthook(args):
-            nonlocal caught
-            caught = True
-        threading.excepthook = excepthook
-        try:
-            interp = interpreters.create()
-            t = interp.run('raise Exception')
-            t.join()
-
-            self.assertTrue(caught)
-        except BaseException:
-            threading.excepthook = threading.__excepthook__
+        self.assertIsNotNone(ctx.caught)
 
 
 class TestIsShareable(TestBase):
diff --git a/Lib/test/test_interpreters/test_channels.py 
b/Lib/test/test_interpreters/test_channels.py
index 3c3e18832d4168..07e503837bcf75 100644
--- a/Lib/test/test_interpreters/test_channels.py
+++ b/Lib/test/test_interpreters/test_channels.py
@@ -120,7 +120,7 @@ def test_send_recv_main(self):
 
     def test_send_recv_same_interpreter(self):
         interp = interpreters.create()
-        interp.exec_sync(dedent("""
+        interp.exec(dedent("""
             from test.support.interpreters import channels
             r, s = channels.create()
             orig = b'spam'
@@ -193,7 +193,7 @@ def test_send_recv_nowait_main_with_default(self):
 
     def test_send_recv_nowait_same_interpreter(self):
         interp = interpreters.create()
-        interp.exec_sync(dedent("""
+        interp.exec(dedent("""
             from test.support.interpreters import channels
             r, s = channels.create()
             orig = b'spam'
diff --git a/Lib/test/test_interpreters/test_lifecycle.py 
b/Lib/test/test_interpreters/test_lifecycle.py
index c2917d839904f9..67b6f439c3191f 100644
--- a/Lib/test/test_interpreters/test_lifecycle.py
+++ b/Lib/test/test_interpreters/test_lifecycle.py
@@ -124,7 +124,7 @@ def test_sys_path_0(self):
             orig = sys.path[0]
 
             interp = interpreters.create()
-            interp.exec_sync(f"""if True:
+            interp.exec(f"""if True:
                 import json
                 import sys
                 print(json.dumps({{
diff --git a/Lib/test/test_interpreters/test_queues.py 
b/Lib/test/test_interpreters/test_queues.py
index 2a8ca99c1f6e3f..65b5435fb00b04 100644
--- a/Lib/test/test_interpreters/test_queues.py
+++ b/Lib/test/test_interpreters/test_queues.py
@@ -51,20 +51,20 @@ def test_shareable(self):
         queue1 = queues.create()
 
         interp = interpreters.create()
-        interp.exec_sync(dedent(f"""
+        interp.exec(dedent(f"""
             from test.support.interpreters import queues
             queue1 = queues.Queue({queue1.id})
             """));
 
         with self.subTest('same interpreter'):
             queue2 = queues.create()
-            queue1.put(queue2)
+            queue1.put(queue2, syncobj=True)
             queue3 = queue1.get()
             self.assertIs(queue3, queue2)
 
         with self.subTest('from current interpreter'):
             queue4 = queues.create()
-            queue1.put(queue4)
+            queue1.put(queue4, syncobj=True)
             out = _run_output(interp, dedent("""
                 queue4 = queue1.get()
                 print(queue4.id)
@@ -75,7 +75,7 @@ def test_shareable(self):
         with self.subTest('from subinterpreter'):
             out = _run_output(interp, dedent("""
                 queue5 = queues.create()
-                queue1.put(queue5)
+                queue1.put(queue5, syncobj=True)
                 print(queue5.id)
                 """))
             qid = int(out)
@@ -118,7 +118,7 @@ class TestQueueOps(TestBase):
     def test_empty(self):
         queue = queues.create()
         before = queue.empty()
-        queue.put(None)
+        queue.put(None, syncobj=True)
         during = queue.empty()
         queue.get()
         after = queue.empty()
@@ -133,7 +133,7 @@ def test_full(self):
         queue = queues.create(3)
         for _ in range(3):
             actual.append(queue.full())
-            queue.put(None)
+            queue.put(None, syncobj=True)
         actual.append(queue.full())
         for _ in range(3):
             queue.get()
@@ -147,16 +147,16 @@ def test_qsize(self):
         queue = queues.create()
         for _ in range(3):
             actual.append(queue.qsize())
-            queue.put(None)
+            queue.put(None, syncobj=True)
         actual.append(queue.qsize())
         queue.get()
         actual.append(queue.qsize())
-        queue.put(None)
+        queue.put(None, syncobj=True)
         actual.append(queue.qsize())
         for _ in range(3):
             queue.get()
             actual.append(queue.qsize())
-        queue.put(None)
+        queue.put(None, syncobj=True)
         actual.append(queue.qsize())
         queue.get()
         actual.append(queue.qsize())
@@ -165,30 +165,81 @@ def test_qsize(self):
 
     def test_put_get_main(self):
         expected = list(range(20))
-        queue = queues.create()
-        for i in range(20):
-            queue.put(i)
-        actual = [queue.get() for _ in range(20)]
+        for syncobj in (True, False):
+            kwds = dict(syncobj=syncobj)
+            with self.subTest(f'syncobj={syncobj}'):
+                queue = queues.create()
+                for i in range(20):
+                    queue.put(i, **kwds)
+                actual = [queue.get() for _ in range(20)]
 
-        self.assertEqual(actual, expected)
+                self.assertEqual(actual, expected)
 
     def test_put_timeout(self):
-        queue = queues.create(2)
-        queue.put(None)
-        queue.put(None)
-        with self.assertRaises(queues.QueueFull):
-            queue.put(None, timeout=0.1)
-        queue.get()
-        queue.put(None)
+        for syncobj in (True, False):
+            kwds = dict(syncobj=syncobj)
+            with self.subTest(f'syncobj={syncobj}'):
+                queue = queues.create(2)
+                queue.put(None, **kwds)
+                queue.put(None, **kwds)
+                with self.assertRaises(queues.QueueFull):
+                    queue.put(None, timeout=0.1, **kwds)
+                queue.get()
+                queue.put(None, **kwds)
 
     def test_put_nowait(self):
-        queue = queues.create(2)
-        queue.put_nowait(None)
-        queue.put_nowait(None)
-        with self.assertRaises(queues.QueueFull):
-            queue.put_nowait(None)
-        queue.get()
-        queue.put_nowait(None)
+        for syncobj in (True, False):
+            kwds = dict(syncobj=syncobj)
+            with self.subTest(f'syncobj={syncobj}'):
+                queue = queues.create(2)
+                queue.put_nowait(None, **kwds)
+                queue.put_nowait(None, **kwds)
+                with self.assertRaises(queues.QueueFull):
+                    queue.put_nowait(None, **kwds)
+                queue.get()
+                queue.put_nowait(None, **kwds)
+
+    def test_put_syncobj(self):
+        for obj in [
+            None,
+            True,
+            10,
+            'spam',
+            b'spam',
+            (0, 'a'),
+        ]:
+            with self.subTest(repr(obj)):
+                queue = queues.create()
+                queue.put(obj, syncobj=True)
+                obj2 = queue.get()
+                self.assertEqual(obj2, obj)
+
+        for obj in [
+            [1, 2, 3],
+            {'a': 13, 'b': 17},
+        ]:
+            with self.subTest(repr(obj)):
+                queue = queues.create()
+                with self.assertRaises(interpreters.NotShareableError):
+                    queue.put(obj, syncobj=True)
+
+    def test_put_not_syncobj(self):
+        for obj in [
+            None,
+            True,
+            10,
+            'spam',
+            b'spam',
+            (0, 'a'),
+            # not shareable
+            [1, 2, 3],
+            {'a': 13, 'b': 17},
+        ]:
+            with self.subTest(repr(obj)):
+                queue = queues.create()
+                queue.put(obj, syncobj=False)
+                obj2 = queue.get()
+                self.assertEqual(obj2, obj)
 
     def test_get_timeout(self):
         queue = queues.create()
@@ -200,13 +251,41 @@ def test_get_nowait(self):
         with self.assertRaises(queues.QueueEmpty):
             queue.get_nowait()
 
+    def test_put_get_default_syncobj(self):
+        expected = list(range(20))
+        queue = queues.create(syncobj=True)
+        for i in range(20):
+            queue.put(i)
+        actual = [queue.get() for _ in range(20)]
+
+        self.assertEqual(actual, expected)
+
+        obj = [1, 2, 3]  # lists are not shareable
+        with self.assertRaises(interpreters.NotShareableError):
+            queue.put(obj)
+
+    def test_put_get_default_not_syncobj(self):
+        expected = list(range(20))
+        queue = queues.create(syncobj=False)
+        for i in range(20):
+            queue.put(i)
+        actual = [queue.get() for _ in range(20)]
+
+        self.assertEqual(actual, expected)
+
+        obj = [1, 2, 3]  # lists are not shareable
+        queue.put(obj)
+        obj2 = queue.get()
+        self.assertEqual(obj, obj2)
+        self.assertIsNot(obj, obj2)
+
     def test_put_get_same_interpreter(self):
         interp = interpreters.create()
-        interp.exec_sync(dedent("""
+        interp.exec(dedent("""
             from test.support.interpreters import queues
             queue = queues.create()
             orig = b'spam'
-            queue.put(orig)
+            queue.put(orig, syncobj=True)
             obj = queue.get()
             assert obj == orig, 'expected: obj == orig'
             assert obj is not orig, 'expected: obj is not orig'
@@ -219,7 +298,7 @@ def test_put_get_different_interpreters(self):
         self.assertEqual(len(queues.list_all()), 2)
 
         obj1 = b'spam'
-        queue1.put(obj1)
+        queue1.put(obj1, syncobj=True)
 
         out = _run_output(
             interp,
@@ -236,7 +315,7 @@ def test_put_get_different_interpreters(self):
                 obj2 = b'eggs'
                 print(id(obj2))
                 assert queue2.qsize() == 0, 'expected: queue2.qsize() == 0'
-                queue2.put(obj2)
+                queue2.put(obj2, syncobj=True)
                 assert queue2.qsize() == 1, 'expected: queue2.qsize() == 1'
                 """))
         self.assertEqual(len(queues.list_all()), 2)
@@ -258,8 +337,8 @@ def test_put_cleared_with_subinterpreter(self):
                 queue = queues.Queue({queue.id})
                 obj1 = b'spam'
                 obj2 = b'eggs'
-                queue.put(obj1)
-                queue.put(obj2)
+                queue.put(obj1, syncobj=True)
+                queue.put(obj2, syncobj=True)
                 """))
         self.assertEqual(queue.qsize(), 2)
 
@@ -281,12 +360,12 @@ def f():
                     break
                 except queues.QueueEmpty:
                     continue
-            queue2.put(obj)
+            queue2.put(obj, syncobj=True)
         t = threading.Thread(target=f)
         t.start()
 
         orig = b'spam'
-        queue1.put(orig)
+        queue1.put(orig, syncobj=True)
         obj = queue2.get()
         t.join()
 
diff --git a/Lib/test/test_interpreters/utils.py 
b/Lib/test/test_interpreters/utils.py
index 3a37ed09dd8943..973d05d4f96dcb 100644
--- a/Lib/test/test_interpreters/utils.py
+++ b/Lib/test/test_interpreters/utils.py
@@ -4,8 +4,9 @@
 import subprocess
 import sys
 import tempfile
-import threading
 from textwrap import dedent
+import threading
+import types
 import unittest
 
 from test import support
@@ -41,7 +42,7 @@ def _run_output(interp, request, init=None):
     with rpipe:
         if init:
             interp.prepare_main(init)
-        interp.exec_sync(script)
+        interp.exec(script)
         return rpipe.read()
 
 
@@ -49,7 +50,7 @@ def _run_output(interp, request, init=None):
 def _running(interp):
     r, w = os.pipe()
     def run():
-        interp.exec_sync(dedent(f"""
+        interp.exec(dedent(f"""
             # wait for "signal"
             with open({r}) as rpipe:
                 rpipe.read()
@@ -84,6 +85,18 @@ def temp_dir(self):
         self.addCleanup(lambda: os_helper.rmtree(tempdir))
         return tempdir
 
+    @contextlib.contextmanager
+    def captured_thread_exception(self):
+        ctx = types.SimpleNamespace(caught=None)
+        def excepthook(args):
+            ctx.caught = args
+        orig_excepthook = threading.excepthook
+        threading.excepthook = excepthook
+        try:
+            yield ctx
+        finally:
+            threading.excepthook = orig_excepthook
+
     def make_script(self, filename, dirname=None, text=None):
         if text:
             text = dedent(text)
diff --git a/Lib/test/test_sys.py b/Lib/test/test_sys.py
index 71671a5a984256..38dcabd84d8170 100644
--- a/Lib/test/test_sys.py
+++ b/Lib/test/test_sys.py
@@ -729,7 +729,7 @@ def test_subinterp_intern_dynamically_allocated(self):
         self.assertIs(t, s)
 
         interp = interpreters.create()
-        interp.exec_sync(textwrap.dedent(f'''
+        interp.exec(textwrap.dedent(f'''
             import sys
             t = sys.intern({s!r})
             assert id(t) != {id(s)}, (id(t), {id(s)})
@@ -744,7 +744,7 @@ def test_subinterp_intern_statically_allocated(self):
         t = sys.intern(s)
 
         interp = interpreters.create()
-        interp.exec_sync(textwrap.dedent(f'''
+        interp.exec(textwrap.dedent(f'''
             import sys
             t = sys.intern({s!r})
             assert id(t) == {id(t)}, (id(t), {id(t)})
diff --git a/Lib/test/test_threading.py b/Lib/test/test_threading.py
index 1ab223b81e939e..3b5c37c948c8c3 100644
--- a/Lib/test/test_threading.py
+++ b/Lib/test/test_threading.py
@@ -1478,7 +1478,7 @@ def test_threads_join_with_no_main(self):
         DONE = b'D'
 
         interp = interpreters.create()
-        interp.exec_sync(f"""if True:
+        interp.exec(f"""if True:
             import os
             import threading
             import time
diff --git a/Modules/_xxinterpqueuesmodule.c b/Modules/_xxinterpqueuesmodule.c
index 7d8c67f49fefb8..715bb766cac624 100644
--- a/Modules/_xxinterpqueuesmodule.c
+++ b/Modules/_xxinterpqueuesmodule.c
@@ -294,6 +294,8 @@ handle_queue_error(int err, PyObject *mod, int64_t qid)
     case ERR_QUEUES_ALLOC:
         PyErr_NoMemory();
         break;
+    case -1:
+        return -1;
     default:
         state = get_module_state(mod);
         assert(state->QueueError != NULL);
@@ -320,14 +322,17 @@ struct _queueitem;
 
 typedef struct _queueitem {
     _PyCrossInterpreterData *data;
+    int fmt;
     struct _queueitem *next;
 } _queueitem;
 
 static void
-_queueitem_init(_queueitem *item, _PyCrossInterpreterData *data)
+_queueitem_init(_queueitem *item,
+                _PyCrossInterpreterData *data, int fmt)
 {
     *item = (_queueitem){
         .data = data,
+        .fmt = fmt,
     };
 }
 
@@ -344,14 +349,14 @@ _queueitem_clear(_queueitem *item)
 }
 
 static _queueitem *
-_queueitem_new(_PyCrossInterpreterData *data)
+_queueitem_new(_PyCrossInterpreterData *data, int fmt)
 {
     _queueitem *item = GLOBAL_MALLOC(_queueitem);
     if (item == NULL) {
         PyErr_NoMemory();
         return NULL;
     }
-    _queueitem_init(item, data);
+    _queueitem_init(item, data, fmt);
     return item;
 }
 
@@ -373,9 +378,11 @@ _queueitem_free_all(_queueitem *item)
 }
 
 static void
-_queueitem_popped(_queueitem *item, _PyCrossInterpreterData **p_data)
+_queueitem_popped(_queueitem *item,
+                  _PyCrossInterpreterData **p_data, int *p_fmt)
 {
     *p_data = item->data;
+    *p_fmt = item->fmt;
     // We clear them here, so they won't be released in _queueitem_clear().
     item->data = NULL;
     _queueitem_free(item);
@@ -393,10 +400,11 @@ typedef struct _queue {
         _queueitem *first;
         _queueitem *last;
     } items;
+    int fmt;
 } _queue;
 
 static int
-_queue_init(_queue *queue, Py_ssize_t maxsize)
+_queue_init(_queue *queue, Py_ssize_t maxsize, int fmt)
 {
     PyThread_type_lock mutex = PyThread_allocate_lock();
     if (mutex == NULL) {
@@ -408,6 +416,7 @@ _queue_init(_queue *queue, Py_ssize_t maxsize)
         .items = {
             .maxsize = maxsize,
         },
+        .fmt = fmt,
     };
     return 0;
 }
@@ -486,7 +495,7 @@ _queue_unlock(_queue *queue)
 }
 
 static int
-_queue_add(_queue *queue, _PyCrossInterpreterData *data)
+_queue_add(_queue *queue, _PyCrossInterpreterData *data, int fmt)
 {
     int err = _queue_lock(queue);
     if (err < 0) {
@@ -502,7 +511,7 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data)
         return ERR_QUEUE_FULL;
     }
 
-    _queueitem *item = _queueitem_new(data);
+    _queueitem *item = _queueitem_new(data, fmt);
     if (item == NULL) {
         _queue_unlock(queue);
         return -1;
@@ -522,7 +531,8 @@ _queue_add(_queue *queue, _PyCrossInterpreterData *data)
 }
 
 static int
-_queue_next(_queue *queue, _PyCrossInterpreterData **p_data)
+_queue_next(_queue *queue,
+            _PyCrossInterpreterData **p_data, int *p_fmt)
 {
     int err = _queue_lock(queue);
     if (err < 0) {
@@ -541,7 +551,7 @@ _queue_next(_queue *queue, _PyCrossInterpreterData **p_data)
     }
     queue->items.count -= 1;
 
-    _queueitem_popped(item, p_data);
+    _queueitem_popped(item, p_data, p_fmt);
 
     _queue_unlock(queue);
     return 0;
@@ -843,18 +853,26 @@ _queues_decref(_queues *queues, int64_t qid)
     PyThread_release_lock(queues->mutex);
 }
 
-static int64_t *
+struct queue_id_and_fmt {
+    int64_t id;
+    int fmt;
+};
+
+static struct queue_id_and_fmt *
 _queues_list_all(_queues *queues, int64_t *count)
 {
-    int64_t *qids = NULL;
+    struct queue_id_and_fmt *qids = NULL;
     PyThread_acquire_lock(queues->mutex, WAIT_LOCK);
-    int64_t *ids = PyMem_NEW(int64_t, (Py_ssize_t)(queues->count));
+    struct queue_id_and_fmt *ids = PyMem_NEW(struct queue_id_and_fmt,
+                                             (Py_ssize_t)(queues->count));
     if (ids == NULL) {
         goto done;
     }
     _queueref *ref = queues->head;
     for (int64_t i=0; ref != NULL; ref = ref->next, i++) {
-        ids[i] = ref->qid;
+        ids[i].id = ref->qid;
+        assert(ref->queue != NULL);
+        ids[i].fmt = ref->queue->fmt;
     }
     *count = queues->count;
 
@@ -890,13 +908,13 @@ _queue_free(_queue *queue)
 
 // Create a new queue.
 static int64_t
-queue_create(_queues *queues, Py_ssize_t maxsize)
+queue_create(_queues *queues, Py_ssize_t maxsize, int fmt)
 {
     _queue *queue = GLOBAL_MALLOC(_queue);
     if (queue == NULL) {
         return ERR_QUEUE_ALLOC;
     }
-    int err = _queue_init(queue, maxsize);
+    int err = _queue_init(queue, maxsize, fmt);
     if (err < 0) {
         GLOBAL_FREE(queue);
         return (int64_t)err;
@@ -925,7 +943,7 @@ queue_destroy(_queues *queues, int64_t qid)
 
 // Push an object onto the queue.
 static int
-queue_put(_queues *queues, int64_t qid, PyObject *obj)
+queue_put(_queues *queues, int64_t qid, PyObject *obj, int fmt)
 {
     // Look up the queue.
     _queue *queue = NULL;
@@ -948,7 +966,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj)
     }
 
     // Add the data to the queue.
-    int res = _queue_add(queue, data);
+    int res = _queue_add(queue, data, fmt);
     _queue_unmark_waiter(queue, queues->mutex);
     if (res != 0) {
         // We may chain an exception here:
@@ -963,7 +981,7 @@ queue_put(_queues *queues, int64_t qid, PyObject *obj)
 // Pop the next object off the queue.  Fail if empty.
 // XXX Support a "wait" mutex?
 static int
-queue_get(_queues *queues, int64_t qid, PyObject **res)
+queue_get(_queues *queues, int64_t qid, PyObject **res, int *p_fmt)
 {
     int err;
     *res = NULL;
@@ -979,7 +997,7 @@ queue_get(_queues *queues, int64_t qid, PyObject **res)
 
     // Pop off the next item from the queue.
     _PyCrossInterpreterData *data = NULL;
-    err = _queue_next(queue, &data);
+    err = _queue_next(queue, &data, p_fmt);
     _queue_unmark_waiter(queue, queues->mutex);
     if (err != 0) {
         return err;
@@ -1267,14 +1285,15 @@ qidarg_converter(PyObject *arg, void *ptr)
 static PyObject *
 queuesmod_create(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"maxsize", NULL};
-    Py_ssize_t maxsize = -1;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "|n:create", kwlist,
-                                     &maxsize)) {
+    static char *kwlist[] = {"maxsize", "fmt", NULL};
+    Py_ssize_t maxsize;
+    int fmt;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "ni:create", kwlist,
+                                     &maxsize, &fmt)) {
         return NULL;
     }
 
-    int64_t qid = queue_create(&_globals.queues, maxsize);
+    int64_t qid = queue_create(&_globals.queues, maxsize, fmt);
     if (qid < 0) {
         (void)handle_queue_error((int)qid, self, qid);
         return NULL;
@@ -1329,7 +1348,7 @@ static PyObject *
 queuesmod_list_all(PyObject *self, PyObject *Py_UNUSED(ignored))
 {
     int64_t count = 0;
-    int64_t *qids = _queues_list_all(&_globals.queues, &count);
+    struct queue_id_and_fmt *qids = _queues_list_all(&_globals.queues, &count);
     if (qids == NULL) {
         if (count == 0) {
             return PyList_New(0);
@@ -1340,14 +1359,14 @@ queuesmod_list_all(PyObject *self, PyObject 
*Py_UNUSED(ignored))
     if (ids == NULL) {
         goto finally;
     }
-    int64_t *cur = qids;
+    struct queue_id_and_fmt *cur = qids;
     for (int64_t i=0; i < count; cur++, i++) {
-        PyObject *qidobj = PyLong_FromLongLong(*cur);
-        if (qidobj == NULL) {
+        PyObject *item = Py_BuildValue("Li", cur->id, cur->fmt);
+        if (item == NULL) {
             Py_SETREF(ids, NULL);
             break;
         }
-        PyList_SET_ITEM(ids, (Py_ssize_t)i, qidobj);
+        PyList_SET_ITEM(ids, (Py_ssize_t)i, item);
     }
 
 finally:
@@ -1363,17 +1382,18 @@ Return the list of IDs for all queues.");
 static PyObject *
 queuesmod_put(PyObject *self, PyObject *args, PyObject *kwds)
 {
-    static char *kwlist[] = {"qid", "obj", NULL};
+    static char *kwlist[] = {"qid", "obj", "fmt", NULL};
     qidarg_converter_data qidarg;
     PyObject *obj;
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&O:put", kwlist,
-                                     qidarg_converter, &qidarg, &obj)) {
+    int fmt;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "O&Oi:put", kwlist,
+                                     qidarg_converter, &qidarg, &obj, &fmt)) {
         return NULL;
     }
     int64_t qid = qidarg.id;
 
     /* Queue up the object. */
-    int err = queue_put(&_globals.queues, qid, obj);
+    int err = queue_put(&_globals.queues, qid, obj, fmt);
     if (handle_queue_error(err, self, qid)) {
         return NULL;
     }
@@ -1382,7 +1402,7 @@ queuesmod_put(PyObject *self, PyObject *args, PyObject 
*kwds)
 }
 
 PyDoc_STRVAR(queuesmod_put_doc,
-"put(qid, obj)\n\
+"put(qid, obj, sharedonly=False)\n\
 \n\
 Add the object's data to the queue.");
 
@@ -1399,7 +1419,8 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject 
*kwds)
     int64_t qid = qidarg.id;
 
     PyObject *obj = NULL;
-    int err = queue_get(&_globals.queues, qid, &obj);
+    int fmt;
+    int err = queue_get(&_globals.queues, qid, &obj, &fmt);
     if (err == ERR_QUEUE_EMPTY && dflt != NULL) {
         assert(obj == NULL);
         obj = Py_NewRef(dflt);
@@ -1407,7 +1428,10 @@ queuesmod_get(PyObject *self, PyObject *args, PyObject 
*kwds)
     else if (handle_queue_error(err, self, qid)) {
         return NULL;
     }
-    return obj;
+
+    PyObject *res = Py_BuildValue("Oi", obj, fmt);
+    Py_DECREF(obj);
+    return res;
 }
 
 PyDoc_STRVAR(queuesmod_get_doc,
@@ -1499,6 +1523,33 @@ PyDoc_STRVAR(queuesmod_get_maxsize_doc,
 \n\
 Return the maximum number of items in the queue.");
 
+static PyObject *
+queuesmod_get_default_fmt(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"qid", NULL};
+    qidarg_converter_data qidarg;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "O&:get_default_fmt", kwlist,
+                                     qidarg_converter, &qidarg)) {
+        return NULL;
+    }
+    int64_t qid = qidarg.id;
+
+    _queue *queue = NULL;
+    int err = _queues_lookup(&_globals.queues, qid, &queue);
+    if (handle_queue_error(err, self, qid)) {
+        return NULL;
+    }
+    int fmt = queue->fmt;
+    _queue_unmark_waiter(queue, _globals.queues.mutex);
+    return PyLong_FromLong(fmt);
+}
+
+PyDoc_STRVAR(queuesmod_get_default_fmt_doc,
+"get_default_fmt(qid)\n\
+\n\
+Return the default format to use for the queue.");
+
 static PyObject *
 queuesmod_is_full(PyObject *self, PyObject *args, PyObject *kwds)
 {
@@ -1593,6 +1644,8 @@ static PyMethodDef module_functions[] = {
      METH_VARARGS | METH_KEYWORDS, queuesmod_release_doc},
     {"get_maxsize",                _PyCFunction_CAST(queuesmod_get_maxsize),
      METH_VARARGS | METH_KEYWORDS, queuesmod_get_maxsize_doc},
+    {"get_default_fmt",            
_PyCFunction_CAST(queuesmod_get_default_fmt),
+     METH_VARARGS | METH_KEYWORDS, queuesmod_get_default_fmt_doc},
     {"is_full",                    _PyCFunction_CAST(queuesmod_is_full),
      METH_VARARGS | METH_KEYWORDS, queuesmod_is_full_doc},
     {"get_count",                  _PyCFunction_CAST(queuesmod_get_count),
diff --git a/Modules/_xxsubinterpretersmodule.c 
b/Modules/_xxsubinterpretersmodule.c
index b4004d165078f7..28c2f9c08bc0da 100644
--- a/Modules/_xxsubinterpretersmodule.c
+++ b/Modules/_xxsubinterpretersmodule.c
@@ -902,6 +902,56 @@ The code/function must not take any arguments or be a 
closure\n\
 If a function is provided, its code object is used and all its state\n\
 is ignored, including its __globals__ dict.");
 
+static PyObject *
+interp_call(PyObject *self, PyObject *args, PyObject *kwds)
+{
+    static char *kwlist[] = {"id", "callable", "args", "kwargs", NULL};
+    PyObject *id, *callable;
+    PyObject *args_obj = NULL;
+    PyObject *kwargs_obj = NULL;
+    if (!PyArg_ParseTupleAndKeywords(args, kwds,
+                                     "OO|OO:" MODULE_NAME_STR ".call", kwlist,
+                                     &id, &callable, &args_obj, &kwargs_obj)) {
+        return NULL;
+    }
+
+    if (args_obj != NULL) {
+        PyErr_SetString(PyExc_ValueError, "got unexpected args");
+        return NULL;
+    }
+    if (kwargs_obj != NULL) {
+        PyErr_SetString(PyExc_ValueError, "got unexpected kwargs");
+        return NULL;
+    }
+
+    PyObject *code = (PyObject *)convert_code_arg(callable, MODULE_NAME_STR 
".call",
+                                                  "argument 2", "a function");
+    if (code == NULL) {
+        return NULL;
+    }
+
+    PyObject *excinfo = NULL;
+    int res = _interp_exec(self, id, code, NULL, &excinfo);
+    Py_DECREF(code);
+    if (res < 0) {
+        assert((excinfo == NULL) != (PyErr_Occurred() == NULL));
+        return excinfo;
+    }
+    Py_RETURN_NONE;
+}
+
+PyDoc_STRVAR(call_doc,
+"call(id, callable, args=None, kwargs=None)\n\
+\n\
+Call the provided object in the identified interpreter.\n\
+Pass the given args and kwargs, if possible.\n\
+\n\
+\"callable\" may be a plain function with no free vars that takes\n\
+no arguments.\n\
+\n\
+The function's code object is used and all its state\n\
+is ignored, including its __globals__ dict.");
+
 static PyObject *
 interp_run_string(PyObject *self, PyObject *args, PyObject *kwds)
 {
@@ -1085,6 +1135,8 @@ static PyMethodDef module_functions[] = {
      METH_VARARGS | METH_KEYWORDS, is_running_doc},
     {"exec",                      _PyCFunction_CAST(interp_exec),
      METH_VARARGS | METH_KEYWORDS, exec_doc},
+    {"call",                      _PyCFunction_CAST(interp_call),
+     METH_VARARGS | METH_KEYWORDS, call_doc},
     {"run_string",                _PyCFunction_CAST(interp_run_string),
      METH_VARARGS | METH_KEYWORDS, run_string_doc},
     {"run_func",                  _PyCFunction_CAST(interp_run_func),
@@ -1113,6 +1165,7 @@ The 'interpreters' module provides a more convenient 
interface.");
 static int
 module_exec(PyObject *mod)
 {
+    PyInterpreterState *interp = PyInterpreterState_Get();
     module_state *state = get_module_state(mod);
 
     // exceptions
@@ -1122,6 +1175,11 @@ module_exec(PyObject *mod)
     if (PyModule_AddType(mod, (PyTypeObject *)PyExc_InterpreterNotFoundError) 
< 0) {
         goto error;
     }
+    PyObject *PyExc_NotShareableError = \
+                
_PyInterpreterState_GetXIState(interp)->PyExc_NotShareableError;
+    if (PyModule_AddType(mod, (PyTypeObject *)PyExc_NotShareableError) < 0) {
+        goto error;
+    }
 
     if (register_memoryview_xid(mod, &state->XIBufferViewType) < 0) {
         goto error;

_______________________________________________
Python-checkins mailing list -- python-checkins@python.org
To unsubscribe send an email to python-checkins-le...@python.org
https://mail.python.org/mailman3/lists/python-checkins.python.org/
Member address: arch...@mail-archive.com

Reply via email to