Charles-François Natali added the comment:
This should fix some kqueue failures (the FDs were not unregistered,
which could result in spurious FDs being reported at a later time).
For the "negative FDs", all spurious events are now caught and logged
(I think it's better than silently ignoring them as it is now).
Hopefully we'll see an EV_ERROR with kev.data set to a sensible errno.
The implementation should now be more or less complete, apart from the
WSAPoll, which I'm unable to add/test.
Richard, in Tulip's WSAPoll code, it reads:
class WindowsPollPollster(PollPollster):
"""Pollster implementation using WSAPoll.
WSAPoll is only available on Windows Vista and later. Python
does not currently support WSAPoll, but there is a patch
available at http://bugs.python.org/issue16507.
"""
Does this means that this code need the patch from issue #16507 to work?
Also, I've read something about IOCP: is this a replacement for
WSAPoll, are there plans to get it merged at some point to python (and
if yes, would the select module be a proper home for this)?
----------
Added file: http://bugs.python.org/file28637/tulip-selectors-3.diff
_______________________________________
Python tracker <rep...@bugs.python.org>
<http://bugs.python.org/issue16853>
_______________________________________
diff --git a/runtests.py b/runtests.py
--- a/runtests.py
+++ b/runtests.py
@@ -2,9 +2,12 @@
# Originally written by Beech Horn (for NDB).
+import logging
import sys
import unittest
+##logging.basicConfig(level=logging.DEBUG)
+
def load_tests():
mods = ['events', 'futures', 'tasks']
diff --git a/tulip/events_test.py b/tulip/events_test.py
--- a/tulip/events_test.py
+++ b/tulip/events_test.py
@@ -10,6 +10,7 @@
from . import events
from . import transports
from . import protocols
+from . import selectors
from . import unix_events
@@ -37,8 +38,8 @@
class EventLoopTestsMixin:
def setUp(self):
- pollster = self.POLLSTER_CLASS()
- event_loop = unix_events.UnixEventLoop(pollster)
+ selector = self.SELECTOR_CLASS()
+ event_loop = unix_events.UnixEventLoop(selector)
events.set_event_loop(event_loop)
def testRun(self):
@@ -230,24 +231,24 @@
el.run_once()
-if hasattr(select, 'kqueue'):
+if hasattr(selectors, 'KqueueSelector'):
class KqueueEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- POLLSTER_CLASS = unix_events.KqueuePollster
+ SELECTOR_CLASS = selectors.KqueueSelector
-if hasattr(select, 'epoll'):
+if hasattr(selectors, 'EpollSelector'):
class EPollEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- POLLSTER_CLASS = unix_events.EPollPollster
+ SELECTOR_CLASS = selectors.EpollSelector
-if hasattr(select, 'poll'):
+if hasattr(selectors, 'PollSelector'):
class PollEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- POLLSTER_CLASS = unix_events.PollPollster
+ SELECTOR_CLASS = selectors.PollSelector
# Should always exist.
class SelectEventLoopTests(EventLoopTestsMixin, unittest.TestCase):
- POLLSTER_CLASS = unix_events.SelectPollster
+ SELECTOR_CLASS = selectors.SelectSelector
class HandlerTests(unittest.TestCase):
diff --git a/tulip/selectors.py b/tulip/selectors.py
new file mode 100644
--- /dev/null
+++ b/tulip/selectors.py
@@ -0,0 +1,372 @@
+"""Select module.
+
+This module supports asynchronous I/O on multiple file descriptors.
+"""
+
+
+import logging
+from select import *
+
+
+# generic events, that must be mapped to implementation-specific ones
+# read event
+SELECT_IN = (1 << 0)
+# write event
+SELECT_OUT = (1 << 1)
+# connect event
+SELECT_CONNECT = SELECT_OUT
+
+
+def _fileobj_to_fd(fileobj):
+ """Return a file descriptor from a file object.
+
+ Parameters:
+ fileobj -- file descriptor, or any object with a `fileno()` method
+
+ Returns:
+ corresponding file descriptor
+ """
+ if isinstance(fileobj, int):
+ fd = fileobj
+ else:
+ try:
+ fd = int(fileobj.fileno())
+ except (ValueError, TypeError):
+ raise ValueError("Invalid file object: {}".format(fileobj))
+ return fd
+
+
+class _Key:
+ """Object used internally to associate a file object to its backing file
+ descriptor, selected event mask and attached data."""
+
+ def __init__(self, fileobj, events, data=None):
+ self.fileobj = fileobj
+ self.fd = _fileobj_to_fd(fileobj)
+ self.events = events
+ self.data = data
+
+
+class _BaseSelector:
+ """Base selector class.
+
+ A selector supports registering file objects to be monitored for specific
+ I/O events.
+
+ A file object is a file descriptor or any object with a `fileno()` method.
+ An arbitrary object can be attached to the file object, which can be used
+ for example to store context information, a callback, etc.
+
+ A selector can use various implementations (select(), poll(), epoll()...)
+ depending on the platform. The default `Selector` class uses the most
+ performant implementation on the current platform.
+ """
+
+ def __init__(self):
+ # this maps file descriptors to keys
+ self._fd_to_key = {}
+ # this maps file objects to keys - for fast (un)registering
+ self._fileobj_to_key = {}
+
+ def register(self, fileobj, events, data=None):
+ """Register a file object.
+
+ Parameters:
+ fileobj -- file object
+ events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
+ data -- attached data
+ """
+ if (not events) or (events & ~(SELECT_IN|SELECT_OUT)):
+ raise ValueError("Invalid events: {}".format(events))
+
+ if fileobj in self._fileobj_to_key:
+ raise KeyError("{} is already registered".format(fileobj))
+
+ key = _Key(fileobj, events, data)
+ self._fd_to_key[key.fd] = key
+ self._fileobj_to_key[fileobj] = key
+ return key
+
+ def unregister(self, fileobj):
+ """Unregister a file object.
+
+ Parameters:
+ fileobj -- file object
+ """
+ try:
+ key = self._fileobj_to_key[fileobj]
+ del self._fd_to_key[key.fd]
+ del self._fileobj_to_key[fileobj]
+ except KeyError:
+ raise KeyError("{} is not registered".format(fileobj))
+ return key
+
+ def modify(self, fileobj, events, data=None):
+ """Change a registered file object monitored events or attached data.
+
+ Parameters:
+ fileobj -- file object
+ events -- events to monitor (bitwise mask of SELECT_IN|SELECT_OUT)
+ data -- attached data
+ """
+ self.unregister(fileobj)
+ self.register(fileobj, events, data)
+
+ def select(self, timeout=None):
+ """Perform the actual selection, until some monitored file objects are
+ ready or a timeout expires.
+
+ Parameters:
+ timeout -- if timeout > 0, this specifies the maximum wait time, in
+ seconds
+ if timeout == 0, the select() call won't block, and will
+ report the currently ready file objects
+ if timeout is None, select() will block until a monitored
+ file object becomes ready
+
+ Returns:
+ list of (fileobj, events, attached data) for ready file objects
+ `events` is a bitwise mask of SELECT_IN|SELECT_OUT
+ """
+ raise NotImplementedError()
+
+ def close(self):
+ """Close the selector.
+
+ This must be called to make sure that any underlying resource is freed.
+ """
+ self._fd_to_key.clear()
+ self._fileobj_to_key.clear()
+
+ def get_info(self, fileobj):
+ """Return information about a registered file object.
+
+ Returns:
+ (events, data) associated to this file object
+
+ Raises KeyError if the file object is not registered.
+ """
+ try:
+ key = self._fileobj_to_key[fileobj]
+ except KeyError:
+ raise KeyError("{} is not registered".format(fileobj))
+ return key.events, key.data
+
+ def registered_count(self):
+ """Return the number of registered file objects.
+
+ Returns:
+ number of currently registered file objects
+ """
+ return len(self._fd_to_key)
+
+ def __enter__(self):
+ return self
+
+ def __exit__(self, *args):
+ self.close()
+
+ def _key_from_fd(self, fd):
+ """Return the key associated to a given file descriptor.
+
+ Parameters:
+ fd -- file descriptor
+
+ Returns:
+ corresponding key
+ """
+ return self._fd_to_key[fd]
+
+
+class SelectSelector(_BaseSelector):
+ """Select-based selector."""
+
+ def __init__(self):
+ super().__init__()
+ self._readers = set()
+ self._writers = set()
+
+ def register(self, fileobj, events, data=None):
+ key = super().register(fileobj, events, data)
+ if events & SELECT_IN:
+ self._readers.add(key.fd)
+ if events & SELECT_OUT:
+ self._writers.add(key.fd)
+
+ def unregister(self, fileobj):
+ key = super().unregister(fileobj)
+ self._readers.discard(key.fd)
+ self._writers.discard(key.fd)
+
+ def select(self, timeout=None):
+ r, w, _ = select(self._readers, self._writers, [], timeout)
+ r = set(r)
+ w = set(w)
+ ready = []
+ for fd in r | w:
+ events = 0
+ if fd in r:
+ events |= SELECT_IN
+ if fd in w:
+ events |= SELECT_OUT
+
+ try:
+ key = self._key_from_fd(fd)
+ except KeyError:
+ logging.warning("select: spurious fd={}".format(fd))
+ else:
+ ready.append((key.fileobj, events, key.data))
+ return ready
+
+
+if 'poll' in globals():
+
+ class PollSelector(_BaseSelector):
+ """Poll-based selector."""
+
+ def __init__(self):
+ super().__init__()
+ self._poll = poll()
+
+ def register(self, fileobj, events, data=None):
+ key = super().register(fileobj, events, data)
+ poll_events = 0
+ if events & SELECT_IN:
+ poll_events |= POLLIN
+ if events & SELECT_OUT:
+ poll_events |= POLLOUT
+ self._poll.register(key.fd, poll_events)
+
+ def unregister(self, fileobj):
+ key = super().unregister(fileobj)
+ self._poll.unregister(key.fd)
+
+ def select(self, timeout=None):
+ timeout = None if timeout is None else int(1000 * timeout)
+ ready = []
+ for fd, event in self._poll.poll(timeout):
+ events = 0
+ if event & ~POLLIN:
+ events |= SELECT_OUT
+ if event & ~POLLOUT:
+ events |= SELECT_IN
+
+ try:
+ key = self._key_from_fd(fd)
+ except KeyError:
+ logging.warning("poll: spurious fd={} "
+ "event={}".format(fd, event))
+ else:
+ ready.append((key.fileobj, events, key.data))
+ return ready
+
+
+if 'epoll' in globals():
+
+ class EpollSelector(_BaseSelector):
+ """Epoll-based selector."""
+
+ def __init__(self):
+ super().__init__()
+ self._epoll = epoll()
+
+ def register(self, fileobj, events, data=None):
+ key = super().register(fileobj, events, data)
+ epoll_events = 0
+ if events & SELECT_IN:
+ epoll_events |= EPOLLIN
+ if events & SELECT_OUT:
+ epoll_events |= EPOLLOUT
+ self._epoll.register(key.fd, epoll_events)
+
+ def unregister(self, fileobj):
+ key = super().unregister(fileobj)
+ self._epoll.unregister(key.fd)
+
+ def select(self, timeout=None):
+ timeout = -1 if timeout is None else timeout
+ max_ev = self.registered_count()
+ ready = []
+ for fd, event in self._epoll.poll(timeout, max_ev):
+ events = 0
+ if event & ~EPOLLIN:
+ events |= SELECT_OUT
+ if event & ~EPOLLOUT:
+ events |= SELECT_IN
+
+ try:
+ key = self._key_from_fd(fd)
+ except KeyError:
+ logging.warning("epoll: spurious fd={} "
+ "event={}".format(fd, event))
+ else:
+ ready.append((key.fileobj, events, key.data))
+ return ready
+
+ def close(self):
+ super().close()
+ self._epoll.close()
+
+
+if 'kqueue' in globals():
+
+ class KqueueSelector(_BaseSelector):
+ """Kqueue-based selector."""
+
+ def __init__(self):
+ super().__init__()
+ self._kqueue = kqueue()
+
+ def register(self, fileobj, events, data=None):
+ key = super().register(fileobj, events, data)
+ if events & SELECT_IN:
+ kev = kevent(key.fd, KQ_FILTER_READ, KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+ if events & SELECT_OUT:
+ kev = kevent(key.fd, KQ_FILTER_WRITE, KQ_EV_ADD)
+ self._kqueue.control([kev], 0, 0)
+
+ def unregister(self, fileobj):
+ key = super().unregister(fileobj)
+ if key.events & SELECT_IN:
+ kev = select.kevent(key.fd, KQ_FILTER_READ, KQ_EV_DELETE)
+ self._kqueue.control([kev], 0, 0)
+ if key.events & SELECT_OUT:
+ kev = select.kevent(key.fd, KQ_FILTER_WRITE, KQ_EV_DELETE)
+ self._kqueue.control([kev], 0, 0)
+
+ def select(self, timeout=None):
+ max_ev = self.registered_count()
+ ready = []
+ for kev in self._kqueue.control(None, max_ev, timeout):
+ fd = kev.ident
+ flag = kev.filter
+ events = 0
+ if flag == KQ_FILTER_READ:
+ events |= SELECT_IN
+ if flag == KQ_FILTER_WRITE:
+ events |= SELECT_OUT
+
+ try:
+ key = self._key_from_fd(fd)
+ except KeyError:
+ logging.warning("kqueue: spurious kev={}".format(kev))
+ else:
+ ready.append((key.fileobj, events, key.data))
+ return ready
+
+ def close(self):
+ super().close()
+ self._kqueue.close()
+
+
+# Choose the best implementation: roughly, epoll|kqueue > poll > select.
+# select() also can't accept a FD > FD_SETSIZE (usually around 1024)
+if 'KqueueSelector' in globals():
+ Selector = KqueueSelector
+elif 'EpollSelector' in globals():
+ Selector = EpollSelector
+elif 'PollSelector' in globals():
+ Selector = PollSelector
+else:
+ Selector = SelectSelector
diff --git a/tulip/unix_events.py b/tulip/unix_events.py
--- a/tulip/unix_events.py
+++ b/tulip/unix_events.py
@@ -1,10 +1,8 @@
"""UNIX event loop and related classes.
-NOTE: The Pollster classes are not part of the published API.
-
-The event loop can be broken up into a pollster (the part responsible
+The event loop can be broken up into a selector (the part responsible
for telling us when file descriptors are ready) and the event loop
-proper, which wraps a pollster with functionality for scheduling
+proper, which wraps a selector with functionality for scheduling
callbacks, immediately or at a given time in the future.
Whenever a public API takes a callback, subsequent positional
@@ -13,22 +11,6 @@
Keyword arguments for the callback are not supported; this is a
conscious design decision, leaving the door open for keyword arguments
to modify the meaning of the API call itself.
-
-There are several implementations of the pollster part, several using
-esoteric system calls that exist only on some platforms. These are:
-
-- kqueue (most BSD systems)
-- epoll (newer Linux systems)
-- poll (most UNIX systems)
-- select (all UNIX systems, and Windows)
-
-NOTE: We don't use select on systems where any of the others is
-available, because select performs poorly as the number of file
-descriptors goes up. The ranking is roughly:
-
- 1. kqueue, epoll, IOCP (best for each platform)
- 2. poll (linear in number of file descriptors polled)
- 3. select (linear in max number of file descriptors supported)
"""
import collections
@@ -46,6 +28,7 @@
from . import events
from . import futures
from . import protocols
+from . import selectors
from . import tasks
from . import transports
@@ -73,352 +56,6 @@
_MAX_WORKERS = 5
-class PollsterBase:
- """Base class for all polling implementations.
-
- This defines an interface to register and unregister readers and
- writers for specific file descriptors, and an interface to get a
- list of events. There's also an interface to check whether any
- readers or writers are currently registered.
- """
-
- def __init__(self):
- super().__init__()
- self.readers = {} # {fd: handler, ...}.
- self.writers = {} # {fd: handler, ...}.
-
- def pollable(self):
- """Return the number readers and writers currently registered."""
- # The event loop needs the number since it must subtract one for
- # the self-pipe.
- return len(self.readers) + len(self.writers)
-
- # Subclasses are expected to extend the add/remove methods.
-
- def register_reader(self, fd, handler):
- """Add or update a reader for a file descriptor."""
- self.readers[fd] = handler
-
- def register_writer(self, fd, handler):
- """Add or update a writer for a file descriptor."""
- self.writers[fd] = handler
-
- def unregister_reader(self, fd):
- """Remove the reader for a file descriptor."""
- del self.readers[fd]
-
- def unregister_writer(self, fd):
- """Remove the writer for a file descriptor."""
- del self.writers[fd]
-
- def register_connector(self, fd, handler):
- """Add or update a connector for a file descriptor."""
- # On Unix a connector is the same as a writer.
- self.register_writer(fd, handler)
-
- def unregister_connector(self, fd):
- """Remove the connector for a file descriptor."""
- # On Unix a connector is the same as a writer.
- self.unregister_writer(fd)
-
- def poll(self, timeout=None):
- """Poll for I/O events. A subclass must implement this.
-
- If timeout is omitted or None, this blocks until at least one
- event is ready. Otherwise, timeout gives a maximum time to
- wait (an int of float in seconds) -- the method returns as
- soon as at least one event is ready or when the timeout is
- expired. For a non-blocking poll, pass 0.
-
- The return value is a list of events; it is empty when the
- timeout expired before any events were ready. Each event
- is a handler previously passed to register_reader/writer().
- """
- raise NotImplementedError
-
-
-if sys.platform != 'win32':
-
- class SelectPollster(PollsterBase):
- """Pollster implementation using select."""
-
- def poll(self, timeout=None):
- readable, writable, _ = select.select(self.readers, self.writers,
- [], timeout)
- events = []
- events += (self.readers[fd] for fd in readable)
- events += (self.writers[fd] for fd in writable)
- return events
-
-else:
-
- class SelectPollster(PollsterBase):
- """Pollster implementation using select."""
-
- def __init__(self):
- super().__init__()
- self.exceptionals = {}
-
- def poll(self, timeout=None):
- # Failed connections are reported as exceptional but not writable.
- readable, writable, exceptional = select.select(
- self.readers, self.writers, self.exceptionals, timeout)
- writable = set(writable).union(exceptional)
- events = []
- events += (self.readers[fd] for fd in readable)
- events += (self.writers[fd] for fd in writable)
- return events
-
- def register_connector(self, fd, token):
- self.register_writer(fd, token)
- self.exceptionals[fd] = token
-
- def unregister_connector(self, fd):
- self.unregister_writer(fd)
- try:
- del self.exceptionals[fd]
- except KeyError:
- # remove_connector() does not check fd in self.exceptionals.
- pass
-
-
-class PollPollster(PollsterBase):
- """Pollster implementation using poll."""
-
- def __init__(self):
- super().__init__()
- self._poll = select.poll()
-
- def _update(self, fd):
- assert isinstance(fd, int), fd
- flags = 0
- if fd in self.readers:
- flags |= select.POLLIN
- if fd in self.writers:
- flags |= select.POLLOUT
- if flags:
- self._poll.register(fd, flags)
- else:
- self._poll.unregister(fd)
-
- def register_reader(self, fd, handler):
- super().register_reader(fd, handler)
- self._update(fd)
-
- def register_writer(self, fd, handler):
- super().register_writer(fd, handler)
- self._update(fd)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- self._update(fd)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- self._update(fd)
-
- def poll(self, timeout=None):
- # Timeout is in seconds, but poll() takes milliseconds.
- msecs = None if timeout is None else int(round(1000 * timeout))
- events = []
- for fd, flags in self._poll.poll(msecs):
- if flags & ~select.POLLOUT:
- if fd in self.readers:
- events.append(self.readers[fd])
- if flags & ~select.POLLIN:
- if fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-if sys.platform == 'win32':
-
- class WindowsPollPollster(PollPollster):
- """Pollster implementation using WSAPoll.
-
- WSAPoll is only available on Windows Vista and later. Python
- does not currently support WSAPoll, but there is a patch
- available at http://bugs.python.org/issue16507.
- """
-
- # REAP_PERIOD is the maximum wait before checking for failed
- # connections. This is necessary because WSAPoll() does notify us
- # of failed connections. See
- # daniel.haxx.se/blog/2012/10/10/wsapoll-is-broken/
- REAP_PERIOD = 5.0
-
- # FD_SETSIZE is maximum number of sockets in an fd_set
- FD_SETSIZE = 512
-
- def __init__(self):
- super().__init__()
- self.exceptionals = {}
-
- def register_connector(self, fd, token):
- self.register_writer(fd, token)
- self.exceptionals[fd] = token
-
- def unregister_connector(self, fd):
- self.unregister_writer(fd)
- try:
- del self.exceptionals[fd]
- except KeyError:
- # remove_connector() does not check fd in self.exceptionals.
- pass
-
- def _get_failed_connector_events(self):
- fds = []
- remaining = list(self.exceptionals)
- while remaining:
- fds += select.select([], [], remaining[:self.FD_SETSIZE], 0)[2]
- del remaining[:self.FD_SETSIZE]
- return [(fd, select.POLLOUT) for fd in fds]
-
- def poll(self, timeout=None):
- if not self.exceptionals:
- msecs = None if timeout is None else int(round(1000 * timeout))
- polled = self._poll.poll(msecs)
-
- elif timeout is None:
- polled = None
- while not polled:
- polled = (self._get_failed_connector_events() or
- self._poll.poll(self.REAP_PERIOD))
-
- elif timeout == 0:
- polled = (self._get_failed_connector_events() or
- self._poll.poll(0))
-
- else:
- start = time.monotonic()
- deadline = start + timeout
- polled = None
- while timeout >= 0:
- msecs = int(round(1000 * min(self.REAP_PERIOD, timeout)))
- polled = (self._get_failed_connector_events() or
- self._poll.poll(self.REAP_PERIOD))
- if polled:
- break
- timemout = deadline - time.monotonic()
-
- events = []
- for fd, flags in polled:
- if flags & ~select.POLLOUT:
- if fd in self.readers:
- events.append(self.readers[fd])
- if flags & ~select.POLLIN:
- if fd in self.writers:
- events.append(self.writers[fd])
- return events
-
- PollPollster = WindowsPollPollster
-
-
-class EPollPollster(PollsterBase):
- """Pollster implementation using epoll."""
-
- def __init__(self):
- super().__init__()
- self._epoll = select.epoll()
-
- def _update(self, fd):
- assert isinstance(fd, int), fd
- eventmask = 0
- if fd in self.readers:
- eventmask |= select.EPOLLIN
- if fd in self.writers:
- eventmask |= select.EPOLLOUT
- if eventmask:
- try:
- self._epoll.register(fd, eventmask)
- except IOError:
- self._epoll.modify(fd, eventmask)
- else:
- self._epoll.unregister(fd)
-
- def register_reader(self, fd, handler):
- super().register_reader(fd, handler)
- self._update(fd)
-
- def register_writer(self, fd, handler):
- super().register_writer(fd, handler)
- self._update(fd)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- self._update(fd)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- self._update(fd)
-
- def poll(self, timeout=None):
- if timeout is None:
- timeout = -1 # epoll.poll() uses -1 to mean "wait forever".
- events = []
- for fd, eventmask in self._epoll.poll(timeout):
- if eventmask & ~select.EPOLLOUT:
- if fd in self.readers:
- events.append(self.readers[fd])
- if eventmask & ~select.EPOLLIN:
- if fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-class KqueuePollster(PollsterBase):
- """Pollster implementation using kqueue."""
-
- def __init__(self):
- super().__init__()
- self._kqueue = select.kqueue()
-
- def register_reader(self, fd, handler):
- if fd not in self.readers:
- kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- return super().register_reader(fd, handler)
-
- def register_writer(self, fd, handler):
- if fd not in self.writers:
- kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
- self._kqueue.control([kev], 0, 0)
- return super().register_writer(fd, handler)
-
- def unregister_reader(self, fd):
- super().unregister_reader(fd)
- kev = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
- self._kqueue.control([kev], 0, 0)
-
- def unregister_writer(self, fd):
- super().unregister_writer(fd)
- kev = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
- self._kqueue.control([kev], 0, 0)
-
- def poll(self, timeout=None):
- events = []
- max_ev = len(self.readers) + len(self.writers)
- for kev in self._kqueue.control(None, max_ev, timeout):
- fd = kev.ident
- flag = kev.filter
- if flag == select.KQ_FILTER_READ and fd in self.readers:
- events.append(self.readers[fd])
- elif flag == select.KQ_FILTER_WRITE and fd in self.writers:
- events.append(self.writers[fd])
- return events
-
-
-# Pick the best pollster class for the platform.
-if hasattr(select, 'kqueue'):
- best_pollster = KqueuePollster
-elif hasattr(select, 'epoll'):
- best_pollster = EPollPollster
-elif hasattr(select, 'poll'):
- best_pollster = PollPollster
-else:
- best_pollster = SelectPollster
-
-
class _StopError(BaseException):
"""Raised to stop the event loop."""
@@ -433,12 +70,13 @@
See events.EventLoop for API specification.
"""
- def __init__(self, pollster=None):
+ def __init__(self, selector=None):
super().__init__()
- if pollster is None:
- logging.info('Using pollster: %s', best_pollster.__name__)
- pollster = best_pollster()
- self._pollster = pollster
+ if selector is None:
+ # pick the best selector class for the platform
+ selector = selectors.Selector()
+ logging.info('Using selector: %s', selector.__name__)
+ self._selector = selector
self._ready = collections.deque()
self._scheduled = []
self._everytime = []
@@ -465,7 +103,9 @@
TODO: Give this a timeout too?
"""
- while self._ready or self._scheduled or self._pollster.pollable() > 1:
+ while (self._ready or
+ self._scheduled or
+ self._selector.registered_count() > 1):
try:
self._run_once()
except _StopError:
@@ -702,36 +342,83 @@
def add_reader(self, fd, callback, *args):
"""Add a reader callback. Return a Handler instance."""
handler = events.Handler(None, callback, args)
- self._pollster.register_reader(fd, handler)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.SELECT_IN,
+ (handler, None, None))
+ else:
+ self._selector.modify(fd, mask | selectors.SELECT_IN,
+ (handler, writer, connector))
+
return handler
def remove_reader(self, fd):
"""Remove a reader callback."""
- if fd in self._pollster.readers:
- self._pollster.unregister_reader(fd)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ pass
+ else:
+ mask &= ~selectors.SELECT_IN
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (None, writer, connector))
def add_writer(self, fd, callback, *args):
"""Add a writer callback. Return a Handler instance."""
handler = events.Handler(None, callback, args)
- self._pollster.register_writer(fd, handler)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.SELECT_OUT,
+ (None, handler, None))
+ else:
+ self._selector.modify(fd, mask | selectors.SELECT_OUT,
+ (reader, handler, connector))
return handler
def remove_writer(self, fd):
"""Remove a writer callback."""
- if fd in self._pollster.writers:
- self._pollster.unregister_writer(fd)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ pass
+ else:
+ mask &= ~selectors.SELECT_OUT
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (reader, None, connector))
def add_connector(self, fd, callback, *args):
"""Add a connector callback. Return a Handler instance."""
- dcall = events.Handler(None, callback, args)
- self._pollster.register_connector(fd, dcall)
- return dcall
+ # XXX As long as SELECT_CONNECT == SELECT_OUT, set the handler
+ # as both writer and connector.
+ handler = events.Handler(None, callback, args)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ self._selector.register(fd, selectors.SELECT_CONNECT,
+ (None, handler, handler))
+ else:
+ self._selector.modify(fd, mask | selectors.SELECT_CONNECT,
+ (reader, handler, handler))
+ return handler
def remove_connector(self, fd):
"""Remove a connector callback."""
- # Every connector fd is in self._pollsters.writers.
- if fd in self._pollster.writers:
- self._pollster.unregister_connector(fd)
+ try:
+ mask, (reader, writer, connector) = self._selector.get_info(fd)
+ except KeyError:
+ pass
+ else:
+ mask &= ~selectors.SELECT_CONNECT
+ if not mask:
+ self._selector.unregister(fd)
+ else:
+ self._selector.modify(fd, mask, (reader, None, None))
def sock_recv(self, sock, n):
"""XXX"""
@@ -743,7 +430,7 @@
fd = sock.fileno()
if registered:
# Remove the callback early. It should be rare that the
- # pollster says the fd is ready but the call still returns
+ # selector says the fd is ready but the call still returns
# EAGAIN, and I am willing to take a hit in that case in
# order to simplify the common case.
self.remove_reader(fd)
@@ -876,10 +563,10 @@
while self._scheduled and self._scheduled[0].cancelled:
heapq.heappop(self._scheduled)
- # Inspect the poll queue. If there's exactly one pollable
+ # Inspect the poll queue. If there's exactly one selectable
# file descriptor, it's the self-pipe, and if there's nothing
# scheduled, we should ignore it.
- if self._pollster.pollable() > 1 or self._scheduled:
+ if self._selector.registered_count() > 1 or self._scheduled:
if self._ready:
timeout = 0
elif self._scheduled:
@@ -892,7 +579,7 @@
timeout = min(timeout, deadline)
t0 = time.monotonic()
- events = self._pollster.poll(timeout)
+ event_list = self._selector.select(timeout)
t1 = time.monotonic()
argstr = '' if timeout is None else ' %.3f' % timeout
if t1-t0 >= 1:
@@ -900,8 +587,13 @@
else:
level = logging.DEBUG
logging.log(level, 'poll%s took %.3f seconds', argstr, t1-t0)
- for handler in events:
- self._add_callback(handler)
+ for fileobj, mask, (reader, writer, connector) in event_list:
+ if mask & selectors.SELECT_IN and reader is not None:
+ self._add_callback(reader)
+ if mask & selectors.SELECT_OUT and writer is not None:
+ self._add_callback(writer)
+ elif mask & selectors.SELECT_CONNECT and connector is not None:
+ self._add_callback(connector)
# Handle 'later' callbacks that are ready.
now = time.monotonic()
_______________________________________________
Python-bugs-list mailing list
Unsubscribe:
http://mail.python.org/mailman/options/python-bugs-list/archive%40mail-archive.com