Author: Alex Gaynor <[email protected]>
Branch:
Changeset: r53208:90cf18134d62
Date: 2012-03-04 14:41 -0500
http://bitbucket.org/pypy/pypy/changeset/90cf18134d62/
Log: Merged kqueue branch. Thanks to oberstet!
diff --git a/pypy/module/select/__init__.py b/pypy/module/select/__init__.py
--- a/pypy/module/select/__init__.py
+++ b/pypy/module/select/__init__.py
@@ -22,6 +22,13 @@
if value is not None:
interpleveldefs[symbol] = "space.wrap(%r)" % value
+ if 'bsd' in sys.platform or sys.platform.startswith('darwin'):
+ interpleveldefs["kqueue"] = "interp_kqueue.W_Kqueue"
+ interpleveldefs["kevent"] = "interp_kqueue.W_Kevent"
+ from pypy.module.select.interp_kqueue import symbol_map
+ for symbol in symbol_map:
+ interpleveldefs[symbol] = "space.wrap(interp_kqueue.%s)" % symbol
+
def buildloaders(cls):
from pypy.rlib import rpoll
for name in rpoll.eventnames:
diff --git a/pypy/module/select/interp_kqueue.py
b/pypy/module/select/interp_kqueue.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/select/interp_kqueue.py
@@ -0,0 +1,344 @@
+from pypy.interpreter.baseobjspace import Wrappable
+from pypy.interpreter.error import OperationError, operationerrfmt,
exception_from_errno
+from pypy.interpreter.gateway import interp2app, unwrap_spec
+from pypy.interpreter.typedef import TypeDef, generic_new_descr, GetSetProperty
+from pypy.rlib._rsocket_rffi import socketclose
+from pypy.rpython.lltypesystem import rffi, lltype
+from pypy.rpython.tool import rffi_platform
+from pypy.translator.tool.cbuild import ExternalCompilationInfo
+
+
+eci = ExternalCompilationInfo(
+ includes = ["sys/types.h",
+ "sys/event.h",
+ "sys/time.h"],
+)
+
+
+class CConfig:
+ _compilation_info_ = eci
+
+
+CConfig.kevent = rffi_platform.Struct("struct kevent", [
+ ("ident", rffi.UINTPTR_T),
+ ("filter", rffi.SHORT),
+ ("flags", rffi.USHORT),
+ ("fflags", rffi.UINT),
+ ("data", rffi.INTPTR_T),
+ ("udata", rffi.VOIDP),
+])
+
+
+CConfig.timespec = rffi_platform.Struct("struct timespec", [
+ ("tv_sec", rffi.TIME_T),
+ ("tv_nsec", rffi.LONG),
+])
+
+
+symbol_map = {
+ "KQ_FILTER_READ": "EVFILT_READ",
+ "KQ_FILTER_WRITE": "EVFILT_WRITE",
+ "KQ_FILTER_AIO": "EVFILT_AIO",
+ "KQ_FILTER_VNODE": "EVFILT_VNODE",
+ "KQ_FILTER_PROC": "EVFILT_PROC",
+# "KQ_FILTER_NETDEV": None, # deprecated on FreeBSD .. no longer defined
+ "KQ_FILTER_SIGNAL": "EVFILT_SIGNAL",
+ "KQ_FILTER_TIMER": "EVFILT_TIMER",
+ "KQ_EV_ADD": "EV_ADD",
+ "KQ_EV_DELETE": "EV_DELETE",
+ "KQ_EV_ENABLE": "EV_ENABLE",
+ "KQ_EV_DISABLE": "EV_DISABLE",
+ "KQ_EV_ONESHOT": "EV_ONESHOT",
+ "KQ_EV_CLEAR": "EV_CLEAR",
+# "KQ_EV_SYSFLAGS": None, # Python docs says "internal event" .. not
defined on FreeBSD
+# "KQ_EV_FLAG1": None, # Python docs says "internal event" .. not defined
on FreeBSD
+ "KQ_EV_EOF": "EV_EOF",
+ "KQ_EV_ERROR": "EV_ERROR"
+}
+
+for symbol in symbol_map.values():
+ setattr(CConfig, symbol, rffi_platform.DefinedConstantInteger(symbol))
+
+cconfig = rffi_platform.configure(CConfig)
+
+kevent = cconfig["kevent"]
+timespec = cconfig["timespec"]
+
+for symbol in symbol_map:
+ globals()[symbol] = cconfig[symbol_map[symbol]]
+
+
+syscall_kqueue = rffi.llexternal(
+ "kqueue",
+ [],
+ rffi.INT,
+ compilation_info=eci
+)
+
+syscall_kevent = rffi.llexternal(
+ "kevent",
+ [rffi.INT,
+ lltype.Ptr(rffi.CArray(kevent)),
+ rffi.INT,
+ lltype.Ptr(rffi.CArray(kevent)),
+ rffi.INT,
+ lltype.Ptr(timespec)
+ ],
+ rffi.INT,
+ compilation_info=eci
+)
+
+
+class W_Kqueue(Wrappable):
+ def __init__(self, space, kqfd):
+ self.kqfd = kqfd
+
+ def descr__new__(space, w_subtype):
+ kqfd = syscall_kqueue()
+ if kqfd < 0:
+ raise exception_from_errno(space, space.w_IOError)
+ return space.wrap(W_Kqueue(space, kqfd))
+
+ @unwrap_spec(fd=int)
+ def descr_fromfd(space, w_cls, fd):
+ return space.wrap(W_Kqueue(space, fd))
+
+ def __del__(self):
+ self.close()
+
+ def get_closed(self):
+ return self.kqfd < 0
+
+ def close(self):
+ if not self.get_closed():
+ kqfd = self.kqfd
+ self.kqfd = -1
+ socketclose(kqfd)
+
+ def check_closed(self, space):
+ if self.get_closed():
+ raise OperationError(space.w_ValueError, space.wrap("I/O operation
on closed kqueue fd"))
+
+ def descr_get_closed(self, space):
+ return space.wrap(self.get_closed())
+
+ def descr_fileno(self, space):
+ self.check_closed(space)
+ return space.wrap(self.kqfd)
+
+ def descr_close(self, space):
+ self.close()
+
+ @unwrap_spec(max_events=int)
+ def descr_control(self, space, w_changelist, max_events, w_timeout=None):
+
+ self.check_closed(space)
+
+ if max_events < 0:
+ raise operationerrfmt(space.w_ValueError,
+ "Length of eventlist must be 0 or positive, got %d", max_events
+ )
+
+ if space.is_w(w_changelist, space.w_None):
+ changelist_len = 0
+ else:
+ changelist_len = space.len_w(w_changelist)
+
+ with lltype.scoped_alloc(rffi.CArray(kevent), changelist_len) as
changelist, \
+ lltype.scoped_alloc(rffi.CArray(kevent), max_events) as
eventlist, \
+ lltype.scoped_alloc(timespec) as timeout:
+
+ if not space.is_w(w_timeout, space.w_None):
+ _timeout = space.float_w(w_timeout)
+ if _timeout < 0:
+ raise operationerrfmt(space.w_ValueError,
+ "Timeout must be None or >= 0, got %s", str(_timeout)
+ )
+ sec = int(_timeout)
+ nsec = int(1e9 * (_timeout - sec))
+ rffi.setintfield(timeout, 'c_tv_sec', sec)
+ rffi.setintfield(timeout, 'c_tv_nsec', nsec)
+ ptimeout = timeout
+ else:
+ ptimeout = lltype.nullptr(timespec)
+
+ if not space.is_w(w_changelist, space.w_None):
+ i = 0
+ for w_ev in space.listview(w_changelist):
+ ev = space.interp_w(W_Kevent, w_ev)
+ changelist[i].c_ident = ev.event.c_ident
+ changelist[i].c_filter = ev.event.c_filter
+ changelist[i].c_flags = ev.event.c_flags
+ changelist[i].c_fflags = ev.event.c_fflags
+ changelist[i].c_data = ev.event.c_data
+ changelist[i].c_udata = ev.event.c_udata
+ i += 1
+ pchangelist = changelist
+ else:
+ pchangelist = lltype.nullptr(rffi.CArray(kevent))
+
+ nfds = syscall_kevent(self.kqfd,
+ pchangelist,
+ changelist_len,
+ eventlist,
+ max_events,
+ ptimeout)
+ if nfds < 0:
+ raise exception_from_errno(space, space.w_IOError)
+ else:
+ elist_w = [None] * nfds
+ for i in xrange(nfds):
+
+ evt = eventlist[i]
+
+ w_event = W_Kevent(space)
+ w_event.event = lltype.malloc(kevent, flavor="raw")
+ w_event.event.c_ident = evt.c_ident
+ w_event.event.c_filter = evt.c_filter
+ w_event.event.c_flags = evt.c_flags
+ w_event.event.c_fflags = evt.c_fflags
+ w_event.event.c_data = evt.c_data
+ w_event.event.c_udata = evt.c_udata
+
+ elist_w[i] = w_event
+
+ return space.newlist(elist_w)
+
+
+
+W_Kqueue.typedef = TypeDef("select.kqueue",
+ __new__ = interp2app(W_Kqueue.descr__new__.im_func),
+ fromfd = interp2app(W_Kqueue.descr_fromfd.im_func, as_classmethod=True),
+
+ closed = GetSetProperty(W_Kqueue.descr_get_closed),
+ fileno = interp2app(W_Kqueue.descr_fileno),
+
+ close = interp2app(W_Kqueue.descr_close),
+ control = interp2app(W_Kqueue.descr_control),
+)
+W_Kqueue.typedef.acceptable_as_base_class = False
+
+
+class W_Kevent(Wrappable):
+ def __init__(self, space):
+ self.event = lltype.nullptr(kevent)
+
+ def __del__(self):
+ if self.event:
+ lltype.free(self.event, flavor="raw")
+
+ @unwrap_spec(filter=int, flags=rffi.r_uint, fflags=rffi.r_uint, data=int,
udata=rffi.r_uint)
+ def descr__init__(self, space, w_ident, filter=KQ_FILTER_READ,
flags=KQ_EV_ADD, fflags=0, data=0, udata=0):
+ ident = space.c_filedescriptor_w(w_ident)
+
+ self.event = lltype.malloc(kevent, flavor="raw")
+ rffi.setintfield(self.event, "c_ident", ident)
+ rffi.setintfield(self.event, "c_filter", filter)
+ rffi.setintfield(self.event, "c_flags", flags)
+ rffi.setintfield(self.event, "c_fflags", fflags)
+ rffi.setintfield(self.event, "c_data", data)
+ self.event.c_udata = rffi.cast(rffi.VOIDP, udata)
+
+ def _compare_all_fields(self, other, op):
+ l_ident = self.event.c_ident
+ r_ident = other.event.c_ident
+ l_filter = rffi.cast(lltype.Signed, self.event.c_filter)
+ r_filter = rffi.cast(lltype.Signed, other.event.c_filter)
+ l_flags = rffi.cast(lltype.Unsigned, self.event.c_flags)
+ r_flags = rffi.cast(lltype.Unsigned, other.event.c_flags)
+ l_fflags = self.event.c_fflags
+ r_fflags = other.event.c_fflags
+ l_data = self.event.c_data
+ r_data = other.event.c_data
+ l_udata = rffi.cast(lltype.Unsigned, self.event.c_udata)
+ r_udata = rffi.cast(lltype.Unsigned, other.event.c_udata)
+
+ if op == "eq":
+ return l_ident == r_ident and \
+ l_filter == r_filter and \
+ l_flags == r_flags and \
+ l_fflags == r_fflags and \
+ l_data == r_data and \
+ l_udata == r_udata
+ elif op == "lt":
+ return (l_ident < r_ident) or \
+ (l_ident == r_ident and l_filter < r_filter) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags <
r_flags) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags < r_fflags) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags == r_fflags and l_data < r_data) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags == r_fflags and l_data == r_data and l_udata < r_udata)
+ elif op == "gt":
+ return (l_ident > r_ident) or \
+ (l_ident == r_ident and l_filter > r_filter) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags >
r_flags) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags > r_fflags) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags == r_fflags and l_data > r_data) or \
+ (l_ident == r_ident and l_filter == r_filter and l_flags ==
r_flags and l_fflags == r_fflags and l_data == r_data and l_udata > r_udata)
+ else:
+ assert False
+
+ def compare_all_fields(self, space, other, op):
+ if not space.interp_w(W_Kevent, other):
+ if op == "eq":
+ return False
+ elif op == "ne":
+ return True
+ else:
+ raise OperationError(space.w_TypeError, space.wrap('cannot
compare kevent to incompatible type'))
+ return self._compare_all_fields(space.interp_w(W_Kevent, other), op)
+
+ def descr__eq__(self, space, w_other):
+ return space.wrap(self.compare_all_fields(space, w_other, "eq"))
+
+ def descr__ne__(self, space, w_other):
+ return space.wrap(not self.compare_all_fields(space, w_other, "eq"))
+
+ def descr__le__(self, space, w_other):
+ return space.wrap(not self.compare_all_fields(space, w_other, "gt"))
+
+ def descr__lt__(self, space, w_other):
+ return space.wrap(self.compare_all_fields(space, w_other, "lt"))
+
+ def descr__ge__(self, space, w_other):
+ return space.wrap(not self.compare_all_fields(space, w_other, "lt"))
+
+ def descr__gt__(self, space, w_other):
+ return space.wrap(self.compare_all_fields(space, w_other, "gt"))
+
+ def descr_get_ident(self, space):
+ return space.wrap(self.event.c_ident)
+
+ def descr_get_filter(self, space):
+ return space.wrap(self.event.c_filter)
+
+ def descr_get_flags(self, space):
+ return space.wrap(self.event.c_flags)
+
+ def descr_get_fflags(self, space):
+ return space.wrap(self.event.c_fflags)
+
+ def descr_get_data(self, space):
+ return space.wrap(self.event.c_data)
+
+ def descr_get_udata(self, space):
+ return space.wrap(rffi.cast(rffi.SIZE_T, self.event.c_udata))
+
+
+W_Kevent.typedef = TypeDef("select.kevent",
+ __new__ = generic_new_descr(W_Kevent),
+ __init__ = interp2app(W_Kevent.descr__init__),
+ __eq__ = interp2app(W_Kevent.descr__eq__),
+ __ne__ = interp2app(W_Kevent.descr__ne__),
+ __le__ = interp2app(W_Kevent.descr__le__),
+ __lt__ = interp2app(W_Kevent.descr__lt__),
+ __ge__ = interp2app(W_Kevent.descr__ge__),
+ __gt__ = interp2app(W_Kevent.descr__gt__),
+
+ ident = GetSetProperty(W_Kevent.descr_get_ident),
+ filter = GetSetProperty(W_Kevent.descr_get_filter),
+ flags = GetSetProperty(W_Kevent.descr_get_flags),
+ fflags = GetSetProperty(W_Kevent.descr_get_fflags),
+ data = GetSetProperty(W_Kevent.descr_get_data),
+ udata = GetSetProperty(W_Kevent.descr_get_udata),
+)
+W_Kevent.typedef.acceptable_as_base_class = False
diff --git a/pypy/module/select/test/test_kqueue.py
b/pypy/module/select/test/test_kqueue.py
new file mode 100644
--- /dev/null
+++ b/pypy/module/select/test/test_kqueue.py
@@ -0,0 +1,190 @@
+# adapted from CPython: Lib/test/test_kqueue.py
+
+import py
+import sys
+
+from pypy.conftest import gettestobjspace
+
+
+class AppTestKqueue(object):
+ def setup_class(cls):
+ if not 'bsd' in sys.platform and \
+ not sys.platform.startswith('darwin'):
+ py.test.skip("test requires BSD")
+ cls.space = gettestobjspace(usemodules=["select", "_socket", "posix"])
+
+ def test_create(self):
+ import select
+
+ kq = select.kqueue()
+ assert kq.fileno() > 0
+ assert not kq.closed
+ kq.close()
+ assert kq.closed
+ raises(ValueError, kq.fileno)
+
+ def test_create_event(self):
+ import select
+ import sys
+
+ fd = sys.stderr.fileno()
+ ev = select.kevent(fd)
+ other = select.kevent(1000)
+ assert ev.ident == fd
+ assert ev.filter == select.KQ_FILTER_READ
+ assert ev.flags == select.KQ_EV_ADD
+ assert ev.fflags == 0
+ assert ev.data == 0
+ assert ev.udata == 0
+ assert ev == ev
+ assert ev != other
+ assert cmp(ev, other) == -1
+ assert ev < other
+ assert other >= ev
+ raises(TypeError, cmp, ev, None)
+ raises(TypeError, cmp, ev, 1)
+ raises(TypeError, cmp, ev, "ev")
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE)
+ assert ev.ident == fd
+ assert ev.filter == select.KQ_FILTER_WRITE
+ assert ev.flags == select.KQ_EV_ADD
+ assert ev.fflags == 0
+ assert ev.data == 0
+ assert ev.udata == 0
+ assert ev == ev
+ assert ev != other
+
+ ev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ONESHOT)
+ assert ev.ident == fd
+ assert ev.filter == select.KQ_FILTER_WRITE
+ assert ev.flags == select.KQ_EV_ONESHOT
+ assert ev.fflags == 0
+ assert ev.data == 0
+ assert ev.udata == 0
+ assert ev == ev
+ assert ev != other
+
+ ev = select.kevent(1, 2, 3, 4, 5, 6)
+ assert ev.ident == 1
+ assert ev.filter == 2
+ assert ev.flags == 3
+ assert ev.fflags == 4
+ assert ev.data == 5
+ assert ev.udata == 6
+ assert ev == ev
+ assert ev != other
+
+ bignum = sys.maxsize * 2 + 1
+ fd = sys.maxsize
+ ev = select.kevent(fd, 1, 2, bignum, sys.maxsize, bignum)
+ assert ev.ident == fd
+ assert ev.filter == 1
+ assert ev.flags == 2
+ assert ev.fflags == bignum
+ assert ev.data == sys.maxsize
+ assert ev.udata == bignum
+ assert ev == ev
+ assert ev != other
+
+ def test_queue_event(self):
+ import errno
+ import select
+ import socket
+ import sys
+
+ server_socket = socket.socket()
+ server_socket.bind(("127.0.0.1", 0))
+ server_socket.listen(1)
+ client = socket.socket()
+ client.setblocking(False)
+ try:
+ client.connect(("127.0.0.1", server_socket.getsockname()[1]))
+ except socket.error as e:
+ if 'bsd' in sys.platform:
+ assert e.args[0] == errno.ENOENT
+ else:
+ assert e.args[0] == errno.EINPROGRESS
+ server, addr = server_socket.accept()
+
+ if sys.platform.startswith("darwin"):
+ flags = select.KQ_EV_ADD | select.KQ_EV_ENABLE
+ else:
+ flags = 0
+
+ kq1 = select.kqueue()
+ kq2 = select.kqueue.fromfd(kq1.fileno())
+
+ ev = select.kevent(server.fileno(), select.KQ_FILTER_WRITE,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq1.control([ev], 0)
+ ev = select.kevent(server.fileno(), select.KQ_FILTER_READ,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq1.control([ev], 0)
+ ev = select.kevent(client.fileno(), select.KQ_FILTER_WRITE,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+ ev = select.kevent(client.fileno(), select.KQ_FILTER_READ,
select.KQ_EV_ADD | select.KQ_EV_ENABLE)
+ kq2.control([ev], 0)
+
+ events = kq1.control(None, 4, 1)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ assert events == [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags),
+ ]
+ client.send("Hello!")
+ server.send("world!!!")
+
+ for i in xrange(10):
+ events = kq1.control(None, 4, 1)
+ if len(events) == 4:
+ break
+ time.sleep(1.0)
+ else:
+ assert False, "timeout waiting for event notification"
+
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ assert events == [
+ (client.fileno(), select.KQ_FILTER_WRITE, flags),
+ (client.fileno(), select.KQ_FILTER_READ, flags),
+ (server.fileno(), select.KQ_FILTER_WRITE, flags),
+ (server.fileno(), select.KQ_FILTER_READ, flags),
+ ]
+
+ ev = select.kevent(client.fileno(), select.KQ_FILTER_WRITE,
select.KQ_EV_DELETE)
+ kq1.control([ev], 0)
+ ev = select.kevent(client.fileno(), select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
+ kq1.control([ev], 0)
+ ev = select.kevent(server.fileno(), select.KQ_FILTER_READ,
select.KQ_EV_DELETE)
+ kq1.control([ev], 0, 0)
+
+ events = kq1.control([], 4, 0.99)
+ events = [(e.ident, e.filter, e.flags) for e in events]
+ events.sort()
+ assert events == [
+ (server.fileno(), select.KQ_FILTER_WRITE, flags),
+ ]
+
+ client.close()
+ server.close()
+ server_socket.close()
+
+ def test_pair(self):
+ import select
+ import socket
+
+ kq = select.kqueue()
+ a, b = socket.socketpair()
+
+ a.send('foo')
+ event1 = select.kevent(a, select.KQ_FILTER_READ, select.KQ_EV_ADD |
select.KQ_EV_ENABLE)
+ event2 = select.kevent(b, select.KQ_FILTER_READ, select.KQ_EV_ADD |
select.KQ_EV_ENABLE)
+ r = kq.control([event1, event2], 1, 1)
+ assert r
+ assert r[0].flags & select.KQ_EV_ERROR == 0
+ data = b.recv(r[0].data)
+ assert data == 'foo'
+
+ a.close()
+ b.close()
+ kq.close()
diff --git a/pypy/rpython/lltypesystem/rffi.py
b/pypy/rpython/lltypesystem/rffi.py
--- a/pypy/rpython/lltypesystem/rffi.py
+++ b/pypy/rpython/lltypesystem/rffi.py
@@ -433,7 +433,8 @@
TYPES.append(name)
TYPES += ['signed char', 'unsigned char',
'long long', 'unsigned long long',
- 'size_t', 'time_t', 'wchar_t']
+ 'size_t', 'time_t', 'wchar_t',
+ 'uintptr_t', 'intptr_t']
if os.name != 'nt':
TYPES.append('mode_t')
TYPES.append('pid_t')
@@ -617,8 +618,6 @@
# (use SIGNEDCHAR or UCHAR for the small integer types)
CHAR = lltype.Char
-INTPTR_T = SSIZE_T
-
# double
DOUBLE = lltype.Float
LONGDOUBLE = lltype.LongFloat
_______________________________________________
pypy-commit mailing list
[email protected]
http://mail.python.org/mailman/listinfo/pypy-commit