Author: Armin Rigo <[email protected]>
Branch: py3.5
Changeset: r89909:170cf64817b0
Date: 2017-02-03 12:53 +0100
http://bitbucket.org/pypy/pypy/changeset/170cf64817b0/
Log: Kill _multiprocessing.Connection, like it seems to have occurred in
CPython. (Just in case, hg history can let us recover it.)
diff --git a/pypy/module/_multiprocessing/__init__.py
b/pypy/module/_multiprocessing/__init__.py
--- a/pypy/module/_multiprocessing/__init__.py
+++ b/pypy/module/_multiprocessing/__init__.py
@@ -5,7 +5,6 @@
class Module(MixedModule):
interpleveldefs = {
- 'Connection' : 'interp_connection.W_FileConnection',
'SemLock' : 'interp_semaphore.W_SemLock',
'sem_unlink' : 'interp_semaphore.semaphore_unlink',
'address_of_buffer' : 'interp_memory.address_of_buffer',
@@ -15,12 +14,5 @@
}
if sys.platform == 'win32':
- interpleveldefs['PipeConnection'] = \
- 'interp_connection.W_PipeConnection'
interpleveldefs['win32'] = 'interp_win32.win32_namespace(space)'
del interpleveldefs['sem_unlink']
-
- def init(self, space):
- MixedModule.init(self, space)
- from pypy.module._multiprocessing.interp_connection import State
- space.fromcache(State).init(space)
diff --git a/pypy/module/_multiprocessing/interp_connection.py
b/pypy/module/_multiprocessing/interp_connection.py
deleted file mode 100644
--- a/pypy/module/_multiprocessing/interp_connection.py
+++ /dev/null
@@ -1,547 +0,0 @@
-import sys
-from errno import EINTR
-
-from rpython.rlib import rpoll, rsocket
-from rpython.rlib.objectmodel import we_are_translated
-from rpython.rlib.rarithmetic import intmask
-from rpython.rtyper.lltypesystem import lltype, rffi
-
-from pypy.interpreter.baseobjspace import W_Root
-from pypy.interpreter.error import OperationError, oefmt, wrap_oserror
-from pypy.interpreter.gateway import (
- WrappedDefault, interp2app, interpindirect2app, unwrap_spec)
-from pypy.interpreter.typedef import GetSetProperty, TypeDef
-
-READABLE, WRITABLE = range(1, 3)
-PY_SSIZE_T_MAX = sys.maxint
-PY_SSIZE_T_MIN = -sys.maxint - 1
-
-class State(object):
- def __init__(self, space):
- pass
-
- def init(self, space):
- w_builtins = space.getbuiltinmodule('builtins')
- w_module = space.call_method(
- w_builtins, '__import__', space.wrap("multiprocessing"))
- self.w_BufferTooShort = space.getattr(w_module,
space.wrap("BufferTooShort"))
-
- self.w_picklemodule = space.call_method(
- w_builtins, '__import__', space.wrap("pickle"))
-
-def BufferTooShort(space, w_data):
- state = space.fromcache(State)
- if not we_are_translated() and not hasattr(state, 'w_BufferTooShort'):
- state.init(space) # xxx for test/test_connection.py
- w_BufferTooShort = state.w_BufferTooShort
- return OperationError(w_BufferTooShort, w_data)
-
-def w_handle(space, handle):
- return space.wrap(rffi.cast(rffi.INTPTR_T, handle))
-
-
-class W_BaseConnection(W_Root):
- BUFFER_SIZE = 1024
- buffer = lltype.nullptr(rffi.CCHARP.TO)
-
- def __init__(self, space, flags):
- self.flags = flags
- self.buffer = lltype.malloc(rffi.CCHARP.TO, self.BUFFER_SIZE,
- flavor='raw')
- self.register_finalizer(space)
-
- def _finalize_(self):
- buf = self.buffer
- if buf:
- self.buffer = lltype.nullptr(rffi.CCHARP.TO)
- lltype.free(buf, flavor='raw')
- try:
- self.do_close()
- except OSError:
- pass
-
- # Abstract methods
- def do_close(self):
- raise NotImplementedError
- def is_valid(self):
- return False
- def do_send_string(self, space, buf, offset, size):
- raise NotImplementedError
- def do_recv_string(self, space, buflength, maxlength):
- raise NotImplementedError
- def do_poll(self, space, timeout):
- raise NotImplementedError
-
- def close(self):
- self.do_close()
-
- def closed_get(self, space):
- return space.newbool(not self.is_valid())
- def readable_get(self, space):
- return space.newbool(bool(self.flags & READABLE))
- def writable_get(self, space):
- return space.newbool(bool(self.flags & WRITABLE))
-
- def _repr(self, space, handle):
- index = self.flags - 1
- conn_type = [u"read-only", u"write-only", u"read-write"][index]
- return space.wrap(u"<%s %s, handle %d>" % (
- conn_type, space.type(self).getname(space), handle))
-
- def descr_repr(self, space):
- raise NotImplementedError
-
- def _check_readable(self, space):
- if not self.flags & READABLE:
- raise oefmt(space.w_IOError, "connection is write-only")
- def _check_writable(self, space):
- if not self.flags & WRITABLE:
- raise oefmt(space.w_IOError, "connection is read-only")
-
- @unwrap_spec(offset='index', size='index')
- def send_bytes(self, space, w_buf, offset=0, size=PY_SSIZE_T_MIN):
- buf = space.getarg_w('s*', w_buf).as_str()
- length = len(buf)
- self._check_writable(space)
- if offset < 0:
- raise oefmt(space.w_ValueError, "offset is negative")
- if length < offset:
- raise oefmt(space.w_ValueError, "buffer length < offset")
-
- if size == PY_SSIZE_T_MIN:
- size = length - offset
- elif size < 0:
- raise oefmt(space.w_ValueError, "size is negative")
- elif offset + size > length:
- raise oefmt(space.w_ValueError, "buffer length > offset + size")
-
- self.do_send_string(space, buf, offset, size)
-
- @unwrap_spec(maxlength='index')
- def recv_bytes(self, space, maxlength=PY_SSIZE_T_MAX):
- self._check_readable(space)
- if maxlength < 0:
- raise oefmt(space.w_ValueError, "maxlength < 0")
-
- res, newbuf = self.do_recv_string(
- space, self.BUFFER_SIZE, maxlength)
- try:
- if newbuf:
- return space.newbytes(rffi.charpsize2str(newbuf, res))
- else:
- return space.newbytes(rffi.charpsize2str(self.buffer, res))
- finally:
- if newbuf:
- rffi.free_charp(newbuf)
-
- @unwrap_spec(offset='index')
- def recv_bytes_into(self, space, w_buffer, offset=0):
- rwbuffer = space.writebuf_w(w_buffer)
- length = rwbuffer.getlength()
-
- res, newbuf = self.do_recv_string(
- space, length - offset, PY_SSIZE_T_MAX)
- try:
- if newbuf:
- raise BufferTooShort(space, space.newbytes(
- rffi.charpsize2str(newbuf, res)))
- rwbuffer.setslice(offset, rffi.charpsize2str(self.buffer, res))
- finally:
- if newbuf:
- rffi.free_charp(newbuf)
-
- return space.wrap(res)
-
- def send(self, space, w_obj):
- self._check_writable(space)
-
- w_picklemodule = space.fromcache(State).w_picklemodule
- w_protocol = space.getattr(
- w_picklemodule, space.wrap("HIGHEST_PROTOCOL"))
- w_pickled = space.call_method(
- w_picklemodule, "dumps", w_obj, w_protocol)
-
- buf = space.str_w(w_pickled)
- self.do_send_string(space, buf, 0, len(buf))
-
- def recv(self, space):
- self._check_readable(space)
-
- res, newbuf = self.do_recv_string(
- space, self.BUFFER_SIZE, PY_SSIZE_T_MAX)
- try:
- if newbuf:
- w_received = space.newbytes(rffi.charpsize2str(newbuf, res))
- else:
- w_received = space.newbytes(rffi.charpsize2str(self.buffer,
res))
- finally:
- if newbuf:
- rffi.free_charp(newbuf)
-
- w_builtins = space.getbuiltinmodule('builtins')
- w_picklemodule = space.fromcache(State).w_picklemodule
- w_unpickled = space.call_method(
- w_picklemodule, "loads", w_received)
-
- return w_unpickled
-
- @unwrap_spec(w_timeout=WrappedDefault(0.0))
- def poll(self, space, w_timeout):
- self._check_readable(space)
- if space.is_w(w_timeout, space.w_None):
- timeout = -1.0 # block forever
- else:
- timeout = space.float_w(w_timeout)
- if timeout < 0.0:
- timeout = 0.0
- return space.newbool(self.do_poll(space, timeout))
-
-W_BaseConnection.typedef = TypeDef(
- 'BaseConnection',
- __repr__ = interpindirect2app(W_BaseConnection.descr_repr),
- closed = GetSetProperty(W_BaseConnection.closed_get),
- readable = GetSetProperty(W_BaseConnection.readable_get),
- writable = GetSetProperty(W_BaseConnection.writable_get),
-
- send_bytes = interp2app(W_BaseConnection.send_bytes),
- recv_bytes = interp2app(W_BaseConnection.recv_bytes),
- recv_bytes_into = interp2app(W_BaseConnection.recv_bytes_into),
- send = interp2app(W_BaseConnection.send),
- recv = interp2app(W_BaseConnection.recv),
- poll = interp2app(W_BaseConnection.poll),
- close = interp2app(W_BaseConnection.close),
- )
-
-class W_FileConnection(W_BaseConnection):
- INVALID_HANDLE_VALUE = -1
- fd = INVALID_HANDLE_VALUE
-
- if sys.platform == 'win32':
- def WRITE(self, data):
- from rpython.rlib._rsocket_rffi import send, geterrno
- length = send(self.fd, data, len(data), 0)
- if length < 0:
- raise WindowsError(geterrno(), "send")
- return length
- def READ(self, size):
- from rpython.rlib._rsocket_rffi import socketrecv, geterrno
- with rffi.scoped_alloc_buffer(size) as buf:
- length = socketrecv(self.fd, buf.raw, buf.size, 0)
- if length < 0:
- raise WindowsError(geterrno(), "recv")
- return buf.str(length)
- def CLOSE(self):
- from rpython.rlib._rsocket_rffi import socketclose
- socketclose(self.fd)
- else:
- def WRITE(self, data):
- import os
- return os.write(self.fd, data)
- def READ(self, length):
- import os
- return os.read(self.fd, length)
- def CLOSE(self):
- import os
- try:
- os.close(self.fd)
- except OSError:
- pass
-
- def __init__(self, space, fd, flags):
- if fd == self.INVALID_HANDLE_VALUE or fd < 0:
- raise oefmt(space.w_IOError, "invalid handle %d", fd)
- W_BaseConnection.__init__(self, space, flags)
- self.fd = fd
-
- @unwrap_spec(fd=int, readable=bool, writable=bool)
- def descr_new_file(space, w_subtype, fd, readable=True, writable=True):
- flags = (readable and READABLE) | (writable and WRITABLE)
-
- self = space.allocate_instance(W_FileConnection, w_subtype)
- W_FileConnection.__init__(self, space, fd, flags)
- return space.wrap(self)
-
- def descr_repr(self, space):
- return self._repr(space, self.fd)
-
- def fileno(self, space):
- return space.wrap(self.fd)
-
- def is_valid(self):
- return self.fd != self.INVALID_HANDLE_VALUE
-
- def do_close(self):
- if self.is_valid():
- self.CLOSE()
- self.fd = self.INVALID_HANDLE_VALUE
-
- def do_send_string(self, space, buf, offset, size):
- # Since str2charp copies the buf anyway, always combine the
- # "header" and the "body" of the message and send them at once.
- message = lltype.malloc(rffi.CCHARP.TO, size + 4, flavor='raw')
- try:
- length = rffi.r_uint(rsocket.htonl(
- rffi.cast(lltype.Unsigned, size)))
- rffi.cast(rffi.UINTP, message)[0] = length
- i = size - 1
- while i >= 0:
- message[4 + i] = buf[offset + i]
- i -= 1
- self._sendall(space, message, size + 4)
- finally:
- lltype.free(message, flavor='raw')
-
- def do_recv_string(self, space, buflength, maxlength):
- with lltype.scoped_alloc(rffi.CArrayPtr(rffi.UINT).TO, 1) as
length_ptr:
- self._recvall(space, rffi.cast(rffi.CCHARP, length_ptr), 4)
- length = intmask(rsocket.ntohl(
- rffi.cast(lltype.Unsigned, length_ptr[0])))
- if length > maxlength: # bad message, close connection
- self.flags &= ~READABLE
- if self.flags == 0:
- self.close()
- raise oefmt(space.w_IOError, "bad message length")
-
- if length <= buflength:
- self._recvall(space, self.buffer, length)
- return length, lltype.nullptr(rffi.CCHARP.TO)
- else:
- newbuf = lltype.malloc(rffi.CCHARP.TO, length, flavor='raw')
- self._recvall(space, newbuf, length)
- return length, newbuf
-
- def _sendall(self, space, message, size):
- while size > 0:
- # XXX inefficient
- data = rffi.charpsize2str(message, size)
- try:
- count = self.WRITE(data)
- except OSError as e:
- if e.errno == EINTR:
- space.getexecutioncontext().checksignals()
- continue
- raise wrap_oserror(space, e)
- size -= count
- message = rffi.ptradd(message, count)
-
- def _recvall(self, space, buf, length):
- length = intmask(length)
- remaining = length
- while remaining > 0:
- try:
- data = self.READ(remaining)
- except OSError as e:
- if e.errno == EINTR:
- space.getexecutioncontext().checksignals()
- continue
- raise wrap_oserror(space, e)
- count = len(data)
- if count == 0:
- if remaining == length:
- raise OperationError(space.w_EOFError, space.w_None)
- else:
- raise oefmt(space.w_IOError,
- "got end of file during message")
- # XXX inefficient
- for i in range(count):
- buf[i] = data[i]
- remaining -= count
- buf = rffi.ptradd(buf, count)
-
- if sys.platform == 'win32':
- def _check_fd(self):
- return self.fd >= 0
- else:
- def _check_fd(self):
- return self.fd >= 0 and self.fd < rpoll.FD_SETSIZE
-
- def do_poll(self, space, timeout):
- if not self._check_fd():
- raise oefmt(space.w_IOError, "handle out of range in select()")
- r, w, e = rpoll.select([self.fd], [], [], timeout, handle_eintr=True)
- return bool(r)
-
-W_FileConnection.typedef = TypeDef(
- '_multiprocessing.Connection', W_BaseConnection.typedef,
- __new__ = interp2app(W_FileConnection.descr_new_file.im_func),
- fileno = interp2app(W_FileConnection.fileno),
-)
-
-class W_PipeConnection(W_BaseConnection):
- if sys.platform == 'win32':
- from rpython.rlib.rwin32 import INVALID_HANDLE_VALUE
-
- def __init__(self, space, handle, flags):
- W_BaseConnection.__init__(self, space, flags)
- self.handle = handle
-
- @unwrap_spec(readable=bool, writable=bool)
- def descr_new_pipe(space, w_subtype, w_handle, readable=True,
- writable=True):
- from pypy.module._multiprocessing.interp_win32 import handle_w
- handle = handle_w(space, w_handle)
- flags = (readable and READABLE) | (writable and WRITABLE)
-
- self = space.allocate_instance(W_PipeConnection, w_subtype)
- W_PipeConnection.__init__(self, space, handle, flags)
- return space.wrap(self)
-
- def descr_repr(self, space):
- return self._repr(space, rffi.cast(rffi.INTPTR_T, self.handle))
-
- def is_valid(self):
- return self.handle != self.INVALID_HANDLE_VALUE
-
- def fileno(self, space):
- return w_handle(space, self.handle)
-
- def do_close(self):
- from rpython.rlib.rwin32 import CloseHandle
- if self.is_valid():
- CloseHandle(self.handle)
- self.handle = self.INVALID_HANDLE_VALUE
-
- def do_send_string(self, space, buf, offset, size):
- from pypy.module._multiprocessing.interp_win32 import (
- _WriteFile, ERROR_NO_SYSTEM_RESOURCES)
- from rpython.rlib import rwin32
-
- with rffi.scoped_view_charp(buf) as charp:
- written_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
- flavor='raw')
- try:
- result = _WriteFile(
- self.handle, rffi.ptradd(charp, offset),
- size, written_ptr, rffi.NULL)
-
- if (result == 0 and
- rwin32.GetLastError_saved() == ERROR_NO_SYSTEM_RESOURCES):
- raise oefmt(space.w_ValueError,
- "Cannot send %d bytes over connection", size)
- finally:
- lltype.free(written_ptr, flavor='raw')
-
- def do_recv_string(self, space, buflength, maxlength):
- from pypy.module._multiprocessing.interp_win32 import (
- _ReadFile, _PeekNamedPipe, ERROR_BROKEN_PIPE, ERROR_MORE_DATA)
- from rpython.rlib import rwin32
- from pypy.interpreter.error import wrap_windowserror
-
- read_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
- flavor='raw')
- left_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
- flavor='raw')
- try:
- result = _ReadFile(self.handle,
- self.buffer, min(self.BUFFER_SIZE, buflength),
- read_ptr, rffi.NULL)
- if result:
- return intmask(read_ptr[0]), lltype.nullptr(rffi.CCHARP.TO)
-
- err = rwin32.GetLastError_saved()
- if err == ERROR_BROKEN_PIPE:
- raise OperationError(space.w_EOFError, space.w_None)
- elif err != ERROR_MORE_DATA:
- raise wrap_windowserror(space, WindowsError(err, "_ReadFile"))
-
- # More data...
- if not _PeekNamedPipe(self.handle, rffi.NULL, 0,
- lltype.nullptr(rwin32.LPDWORD.TO),
- lltype.nullptr(rwin32.LPDWORD.TO),
- left_ptr):
- raise wrap_windowserror(space, rwin32.lastSavedWindowsError())
-
- length = intmask(read_ptr[0] + left_ptr[0])
- if length > maxlength: # bad message, close connection
- self.flags &= ~READABLE
- if self.flags == 0:
- self.close()
- raise oefmt(space.w_IOError, "bad message length")
-
- newbuf = lltype.malloc(rffi.CCHARP.TO, length + 1, flavor='raw')
- for i in range(read_ptr[0]):
- newbuf[i] = self.buffer[i]
-
- result = _ReadFile(self.handle,
- rffi.ptradd(newbuf, read_ptr[0]), left_ptr[0],
- read_ptr, rffi.NULL)
- if not result:
- rffi.free_charp(newbuf)
- raise wrap_windowserror(space, rwin32.lastSavedWindowsError())
-
- assert read_ptr[0] == left_ptr[0]
- return length, newbuf
- finally:
- lltype.free(read_ptr, flavor='raw')
- lltype.free(left_ptr, flavor='raw')
-
- def do_poll(self, space, timeout):
- from pypy.module._multiprocessing.interp_win32 import (
- _PeekNamedPipe, _GetTickCount, _Sleep)
- from rpython.rlib import rwin32
- from pypy.interpreter.error import wrap_windowserror
- bytes_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
- flavor='raw')
- try:
- if not _PeekNamedPipe(self.handle, rffi.NULL, 0,
- lltype.nullptr(rwin32.LPDWORD.TO),
- bytes_ptr,
- lltype.nullptr(rwin32.LPDWORD.TO)):
- raise wrap_windowserror(space, rwin32.lastSavedWindowsError())
- bytes = bytes_ptr[0]
- finally:
- lltype.free(bytes_ptr, flavor='raw')
-
- if timeout == 0.0:
- return bytes > 0
-
- block = timeout < 0
- if not block:
- # XXX does not check for overflow
- deadline = intmask(_GetTickCount()) + int(1000 * timeout + 0.5)
- else:
- deadline = 0
-
- _Sleep(0)
-
- delay = 1
- while True:
- bytes_ptr = lltype.malloc(rffi.CArrayPtr(rwin32.DWORD).TO, 1,
- flavor='raw')
- try:
- if not _PeekNamedPipe(self.handle, rffi.NULL, 0,
- lltype.nullptr(rwin32.LPDWORD.TO),
- bytes_ptr,
- lltype.nullptr(rwin32.LPDWORD.TO)):
- raise wrap_windowserror(space,
- rwin32.lastSavedWindowsError())
- bytes = bytes_ptr[0]
- finally:
- lltype.free(bytes_ptr, flavor='raw')
-
- if bytes > 0:
- return True
-
- if not block:
- now = intmask(_GetTickCount())
- if now > deadline:
- return False
- diff = deadline - now
- if delay > diff:
- delay = diff
- else:
- delay += 1
-
- if delay >= 20:
- delay = 20
- _Sleep(delay)
-
- # check for signals
- # PyErr_CheckSignals()
-
-if sys.platform == 'win32':
- W_PipeConnection.typedef = TypeDef(
- '_multiprocessing.PipeConnection', W_BaseConnection.typedef,
- __new__ = interp2app(W_PipeConnection.descr_new_pipe.im_func),
- fileno = interp2app(W_PipeConnection.fileno),
- )
diff --git a/pypy/module/_multiprocessing/interp_semaphore.py
b/pypy/module/_multiprocessing/interp_semaphore.py
--- a/pypy/module/_multiprocessing/interp_semaphore.py
+++ b/pypy/module/_multiprocessing/interp_semaphore.py
@@ -13,7 +13,6 @@
from pypy.interpreter.error import oefmt, wrap_oserror
from pypy.interpreter.gateway import interp2app, unwrap_spec
from pypy.interpreter.typedef import GetSetProperty, TypeDef
-from pypy.module._multiprocessing.interp_connection import w_handle
RECURSIVE_MUTEX, SEMAPHORE = range(2)
@@ -455,7 +454,8 @@
return space.newint(self.maxvalue)
def handle_get(self, space):
- return w_handle(space, self.handle)
+ h = rffi.cast(rffi.INTPTR_T, self.handle)
+ return space.wrap(h)
def get_count(self, space):
return space.wrap(self.count)
diff --git a/pypy/module/_multiprocessing/test/test_connection.py
b/pypy/module/_multiprocessing/test/test_connection.py
deleted file mode 100644
--- a/pypy/module/_multiprocessing/test/test_connection.py
+++ /dev/null
@@ -1,200 +0,0 @@
-import py
-import sys
-from pypy.interpreter.gateway import interp2app, W_Root
-
-class TestImport:
- def test_simple(self):
- from pypy.module._multiprocessing import interp_connection
- from pypy.module._multiprocessing import interp_semaphore
-
-class AppTestBufferTooShort:
- spaceconfig = {'usemodules': ['_multiprocessing', 'thread', 'signal',
- 'select', 'struct', 'binascii',
- '_posixsubprocess']}
- if sys.platform == 'win32':
- spaceconfig['usemodules'].append('_rawffi')
- else:
- spaceconfig['usemodules'].append('fcntl')
-
-
- def setup_class(cls):
- if cls.runappdirect:
- def raiseBufferTooShort(self, data):
- import multiprocessing
- raise multiprocessing.BufferTooShort(data)
- cls.w_raiseBufferTooShort = raiseBufferTooShort
- else:
- from pypy.module._multiprocessing import interp_connection
- def raiseBufferTooShort(space, w_data):
- raise interp_connection.BufferTooShort(space, w_data)
- cls.w_raiseBufferTooShort = cls.space.wrap(
- interp2app(raiseBufferTooShort))
-
- def test_exception(self):
- import multiprocessing
- try:
- self.raiseBufferTooShort("data")
- except multiprocessing.BufferTooShort as e:
- assert isinstance(e, multiprocessing.ProcessError)
- assert e.args == ("data",)
-
-class BaseConnectionTest(object):
- def test_connection(self):
- rhandle, whandle = self.make_pair()
-
- whandle.send_bytes(b"abc")
- assert rhandle.recv_bytes(100) == b"abc"
-
- obj = [1, 2.0, "hello"]
- whandle.send(obj)
- obj2 = rhandle.recv()
- assert obj == obj2
-
- def test_poll(self):
- rhandle, whandle = self.make_pair()
-
- assert rhandle.poll() == False
- assert rhandle.poll(1) == False
- whandle.send(1)
- import time; time.sleep(0.1) # give it time to arrive :-)
- assert rhandle.poll() == True
- assert rhandle.poll(None) == True
- assert rhandle.recv() == 1
- assert rhandle.poll() == False
- raises(IOError, whandle.poll)
-
- def test_read_into(self):
- import array, multiprocessing
- rhandle, whandle = self.make_pair()
-
- obj = [1, 2.0, "hello"]
- whandle.send(obj)
- buffer = array.array('b', [0]*10)
- raises(multiprocessing.BufferTooShort, rhandle.recv_bytes_into, buffer)
- assert rhandle.readable
-
-class AppTestWinpipeConnection(BaseConnectionTest):
- spaceconfig = {
- "usemodules": [
- '_multiprocessing', 'thread', 'signal', 'struct', 'array',
- 'itertools', '_socket', 'binascii',
- ]
- }
- if sys.platform == 'win32':
- spaceconfig['usemodules'].append('_rawffi')
-
- def setup_class(cls):
- if sys.platform != "win32":
- py.test.skip("win32 only")
-
- if not cls.runappdirect:
- space = cls.space
- # stubs for some modules,
- # just for multiprocessing to import correctly on Windows
- w_modules = space.sys.get('modules')
- space.setitem(w_modules, space.wrap('msvcrt'), space.sys)
- else:
- import _multiprocessing
-
- def w_make_pair(self):
- import multiprocessing
-
- return multiprocessing.Pipe(duplex=False)
-
-
-class AppTestSocketConnection(BaseConnectionTest):
- spaceconfig = {
- "usemodules": [
- '_multiprocessing', 'thread', 'signal', 'struct', 'array',
- '_socket', 'binascii', 'select', '_posixsubprocess']
- }
- if sys.platform == 'win32':
- spaceconfig['usemodules'].append('_rawffi')
- else:
- spaceconfig['usemodules'].append('fcntl')
-
- def setup_class(cls):
- cls.w_connections = cls.space.newlist([])
-
- def w_socketpair(self):
- "A socket.socketpair() that works on Windows"
- import errno
- import socket
-
- serverSocket = socket.socket()
- serverSocket.bind(('127.0.0.1', 0))
- serverSocket.listen(1)
-
- client = socket.socket()
- client.setblocking(False)
- try:
- client.connect(('127.0.0.1', serverSocket.getsockname()[1]))
- except socket.error as e:
- assert e.args[0] in (errno.EINPROGRESS, errno.EWOULDBLOCK)
- server, addr = serverSocket.accept()
-
- # keep sockets alive during the test
- self.connections.append(server)
- self.connections.append(client)
-
- return server.fileno(), client.fileno()
-
- def w_make_pair(self):
- import _multiprocessing
-
- fd1, fd2 = self.socketpair()
- rhandle = _multiprocessing.Connection(fd1, writable=False)
- whandle = _multiprocessing.Connection(fd2, readable=False)
- self.connections.append(rhandle)
- self.connections.append(whandle)
- return rhandle, whandle
-
- def teardown_method(self, func):
- # Work hard to close all sockets and connections now!
- # since the fd is probably already closed, another unrelated
- # part of the program will probably reuse it;
- # And any object forgotten here will close it on destruction...
- try:
- w_connections = self.w_connections
- except AttributeError:
- return
- space = self.space
- for c in space.unpackiterable(w_connections):
- if isinstance(c, W_Root):
- space.call_method(c, "close")
- else:
- c.close()
- space.delslice(w_connections, space.wrap(0), space.wrap(100))
-
- def test_bad_fd(self):
- import _multiprocessing
-
- raises(IOError, _multiprocessing.Connection, -1)
- raises(IOError, _multiprocessing.Connection, -15)
-
- def test_byte_order(self):
- import socket
- if not 'fromfd' in dir(socket):
- skip('No fromfd in socket')
- # The exact format of net strings (length in network byte
- # order) is important for interoperation with others
- # implementations.
- rhandle, whandle = self.make_pair()
- whandle.send_bytes(b"abc")
- whandle.send_bytes(b"defg")
- sock = socket.fromfd(rhandle.fileno(),
- socket.AF_INET, socket.SOCK_STREAM)
- data1 = sock.recv(7)
- assert data1 == b'\x00\x00\x00\x03abc'
- data2 = sock.recv(8)
- assert data2 == b'\x00\x00\x00\x04defg'
-
- def test_repr(self):
- import _multiprocessing, os
- fd = os.dup(1) # closed by Connection.__del__
- c = _multiprocessing.Connection(fd)
- assert repr(c) == '<read-write Connection, handle %d>' % fd
- if hasattr(_multiprocessing, 'PipeConnection'):
- fd = os.dup(1) # closed by PipeConnection.__del__
- c = _multiprocessing.PipeConnection(fd)
- assert repr(c) == '<read-write PipeConnection, handle %d>' % fd
_______________________________________________
pypy-commit mailing list
[email protected]
https://mail.python.org/mailman/listinfo/pypy-commit