Hi,
I have a good news: I have a work-in-progress branch in Trollius which
should allow to use asyncio modules like aiohttp on Python 3.3 and
later. I'm interested by a review and feedback to see if I should
merge this branch or not.
Example of Trollius using aiohttp (written for asyncio):
---
import aiohttp
import asyncio
import trollius
def get_body(url):
response = yield from aiohttp.request('GET', url)
body = yield from response.read_and_close()
print(body)
loop = trollius.get_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(get_body('http://www.perdu.com/'))
loop.close()
---
I decided to rename the Python module "asyncio" to "trollius" in the
version 0.3 of the Trollius project to support Python 3.4 and later. A
discussion followed this change because authors of modules have to
modify their code to support Trollius, and aiohttp author is not
interested to support Trollius for example (Andrew Svetlov likes
"yield-from", and I understand that ;-)).
I wrote a new "trollius_asyncio_interop" branch in Trollius to support
interoperability with asyncio.
Attached patch is the current differences between trollius and
trollius_asyncio_interop branches. Summary of changes:
- asyncio.set_event_loop() now accepts a Trollius event loop: the
hack(?) is to directly use asyncio.AbstractEventLoop in trollius if
the asyncio module is available
- an asyncio coroutine can yield from a trollius coroutine
- an trollius coroutine can yield an asyncio coroutine
See examples/interop_asyncio.py for an example mixing asyncio and
trollius coroutines. Link to the example:
https://bitbucket.org/enovance/trollius/src/e637d42d59b114528c4467f94162d0f335e5df4d/examples/interop_asyncio.py?at=trollius_interop_asyncio
The nice thing is that asyncio code doesn't need to be modified! I
would prefer to not make asyncio uglier to support Python 2. asyncio
is supposed to be a shiny new feature of Python 3.4 and so don't care
of the old (dead) Python 2!
The limitation is that the application must use a trollius event loop.
Asyncio event loops reject explicitly coroutines using "yield"
(instead of yield-from).
I guess that something should be done for the event loop policy too.
Trollius reuses directly the asyncio.AbstractEventLoop class. It might
cause issues if trollius and asyncio versions are different (if the
class is different). Since it's an abstract class and almost all
methods are overriden, I'm not sure that it's a real issue.
Trollius project:
http://trollius.readthedocs.org/
Victor
[?1034hdiff -r 1fd4ec764042 -r e637d42d59b1 examples/interop_asyncio.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/examples/interop_asyncio.py Wed Jun 11 17:29:29 2014 +0200
@@ -0,0 +1,51 @@
+import asyncio
+import trollius
+
[email protected]
+def asyncio_noop():
+ pass
+
[email protected]
+def asyncio_coroutine(coro):
+ print("asyncio coroutine")
+ res = yield from coro
+ print("asyncio inner coroutine result: %r" % (res,))
+ print("asyncio coroutine done")
+ return "asyncio"
+
[email protected]
+def trollius_noop():
+ pass
+
[email protected]
+def trollius_coroutine(coro):
+ print("trollius coroutine")
+ res = yield trollius.From(coro)
+ print("trollius inner coroutine result: %r" % (res,))
+ print("trollius coroutine done")
+ raise trollius.Return("trollius")
+
+def main():
+ # create an event loop for the main thread: use Trollius event loop
+ loop = trollius.get_event_loop()
+
+ # set the asyncio event loop (for the main thread)
+ asyncio.set_event_loop(loop)
+
+ print("[ asyncio coroutine called from trollius coroutine ]")
+ coro1 = asyncio_noop()
+ coro2 = asyncio_coroutine(coro1)
+ res = loop.run_until_complete(trollius_coroutine(coro2))
+ print("trollius coroutine result: %r" % res)
+ print("")
+
+ print("[ asyncio coroutine called from trollius coroutine ]")
+ coro1 = trollius_noop()
+ coro2 = trollius_coroutine(coro1)
+ res = loop.run_until_complete(asyncio_coroutine(coro2))
+ print("asyncio coroutine result: %r" % res)
+ print("")
+
+ loop.close()
+
+main()
diff -r 1fd4ec764042 -r e637d42d59b1 tests/test_tasks.py
--- a/tests/test_tasks.py Wed Jun 11 10:34:18 2014 +0200
+++ b/tests/test_tasks.py Wed Jun 11 17:29:29 2014 +0200
@@ -1451,7 +1451,7 @@ class TaskTests(test_utils.TestCase):
cw.send(None)
try:
cw.send(arg)
- except Return as ex:
+ except StopIteration as ex:
return ex.value
else:
raise AssertionError('StopIteration was expected')
diff -r 1fd4ec764042 -r e637d42d59b1 trollius/coroutines.py
--- a/trollius/coroutines.py Wed Jun 11 10:34:18 2014 +0200
+++ b/trollius/coroutines.py Wed Jun 11 17:29:29 2014 +0200
@@ -2,8 +2,13 @@ import functools
import inspect
import os
import sys
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
-from trollius import futures
+from . import compat
+from . import futures
from .log import logger
# If you set _DEBUG to true, @coroutine will wrap the resulting
@@ -19,20 +24,37 @@ from .log import logger
and bool(os.environ.get('PYTHONASYNCIODEBUG')))
-class Return(StopIteration):
- def __init__(self, *value):
- StopIteration.__init__(self)
- if not value:
- self.value = None
- elif len(value) == 1:
- self.value = value[0]
+if compat.PY33:
+ # Don't use the Return class on Python 3.3 and later to support asyncio
+ # coroutines (to avoid the warning emited in Return destructor).
+ #
+ # The problem is that Return inherits from StopIteration. "yield from
+ # trollius_coroutine". Task._step() does not receive the Return exception,
+ # because "yield from" handles it internally. So it's not possible to set
+ # the raised attribute to True to avoid the warning in Return destructor.
+ def Return(*args):
+ if not args:
+ value = None
+ elif len(args) == 1:
+ value = args[0]
else:
- self.value = value
- self.raised = False
+ value = args
+ return StopIteration(value)
+else:
+ class Return(StopIteration):
+ def __init__(self, *args):
+ StopIteration.__init__(self)
+ if not args:
+ self.value = None
+ elif len(args) == 1:
+ self.value = args[0]
+ else:
+ self.value = args
+ self.raised = False
- def __del__(self):
- if not self.raised:
- logger.error('Return(%r) used without raise', self.value)
+ def __del__(self):
+ if not self.raised:
+ logger.error('Return(%r) used without raise', self.value)
class CoroWrapper(object):
diff -r 1fd4ec764042 -r e637d42d59b1 trollius/events.py
--- a/trollius/events.py Wed Jun 11 10:34:18 2014 +0200
+++ b/trollius/events.py Wed Jun 11 17:29:29 2014 +0200
@@ -12,6 +12,10 @@ from __future__ import absolute_import
import subprocess
import threading
import socket
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
class Handle(object):
@@ -113,241 +117,246 @@ class AbstractServer(object):
return NotImplemented
-class AbstractEventLoop(object):
- """Abstract event loop."""
+if asyncio is not None:
+ # Reuse asyncio class so asyncio.set_event_loop() accepts Trollius event
+ # loops
+ AbstractEventLoop = asyncio.AbstractEventLoop
+else:
+ class AbstractEventLoop(object):
+ """Abstract event loop."""
- # Running and stopping the event loop.
+ # Running and stopping the event loop.
- def run_forever(self):
- """Run the event loop until stop() is called."""
- raise NotImplementedError
+ def run_forever(self):
+ """Run the event loop until stop() is called."""
+ raise NotImplementedError
- def run_until_complete(self, future):
- """Run the event loop until a Future is done.
+ def run_until_complete(self, future):
+ """Run the event loop until a Future is done.
- Return the Future's result, or raise its exception.
- """
- raise NotImplementedError
+ Return the Future's result, or raise its exception.
+ """
+ raise NotImplementedError
- def stop(self):
- """Stop the event loop as soon as reasonable.
+ def stop(self):
+ """Stop the event loop as soon as reasonable.
- Exactly how soon that is may depend on the implementation, but
- no more I/O callbacks should be scheduled.
- """
- raise NotImplementedError
+ Exactly how soon that is may depend on the implementation, but
+ no more I/O callbacks should be scheduled.
+ """
+ raise NotImplementedError
- def is_running(self):
- """Return whether the event loop is currently running."""
- raise NotImplementedError
+ def is_running(self):
+ """Return whether the event loop is currently running."""
+ raise NotImplementedError
- def close(self):
- """Close the loop.
+ def close(self):
+ """Close the loop.
- The loop should not be running.
+ The loop should not be running.
- This is idempotent and irreversible.
+ This is idempotent and irreversible.
- No other methods should be called after this one.
- """
- raise NotImplementedError
+ No other methods should be called after this one.
+ """
+ raise NotImplementedError
- # Methods scheduling callbacks. All these return Handles.
+ # Methods scheduling callbacks. All these return Handles.
- def call_soon(self, callback, *args):
- return self.call_later(0, callback, *args)
+ def call_soon(self, callback, *args):
+ return self.call_later(0, callback, *args)
- def call_later(self, delay, callback, *args):
- raise NotImplementedError
+ def call_later(self, delay, callback, *args):
+ raise NotImplementedError
- def call_at(self, when, callback, *args):
- raise NotImplementedError
+ def call_at(self, when, callback, *args):
+ raise NotImplementedError
- def time(self):
- raise NotImplementedError
+ def time(self):
+ raise NotImplementedError
- # Methods for interacting with threads.
+ # Methods for interacting with threads.
- def call_soon_threadsafe(self, callback, *args):
- raise NotImplementedError
+ def call_soon_threadsafe(self, callback, *args):
+ raise NotImplementedError
- def run_in_executor(self, executor, callback, *args):
- raise NotImplementedError
+ def run_in_executor(self, executor, callback, *args):
+ raise NotImplementedError
- def set_default_executor(self, executor):
- raise NotImplementedError
+ def set_default_executor(self, executor):
+ raise NotImplementedError
- # Network I/O methods returning Futures.
+ # Network I/O methods returning Futures.
- def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
- raise NotImplementedError
+ def getaddrinfo(self, host, port, family=0, type=0, proto=0, flags=0):
+ raise NotImplementedError
- def getnameinfo(self, sockaddr, flags=0):
- raise NotImplementedError
+ def getnameinfo(self, sockaddr, flags=0):
+ raise NotImplementedError
- def create_connection(self, protocol_factory, host=None, port=None,
- ssl=None, family=0, proto=0, flags=0, sock=None,
- local_addr=None, server_hostname=None):
- raise NotImplementedError
+ def create_connection(self, protocol_factory, host=None, port=None,
+ ssl=None, family=0, proto=0, flags=0, sock=None,
+ local_addr=None, server_hostname=None):
+ raise NotImplementedError
- def create_server(self, protocol_factory, host=None, port=None,
- family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
- sock=None, backlog=100, ssl=None, reuse_address=None):
- """A coroutine which creates a TCP server bound to host and port.
+ def create_server(self, protocol_factory, host=None, port=None,
+ family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE,
+ sock=None, backlog=100, ssl=None, reuse_address=None):
+ """A coroutine which creates a TCP server bound to host and port.
- The return value is a Server object which can be used to stop
- the service.
+ The return value is a Server object which can be used to stop
+ the service.
- If host is an empty string or None all interfaces are assumed
- and a list of multiple sockets will be returned (most likely
- one for IPv4 and another one for IPv6).
+ If host is an empty string or None all interfaces are assumed
+ and a list of multiple sockets will be returned (most likely
+ one for IPv4 and another one for IPv6).
- family can be set to either AF_INET or AF_INET6 to force the
- socket to use IPv4 or IPv6. If not set it will be determined
- from host (defaults to AF_UNSPEC).
+ family can be set to either AF_INET or AF_INET6 to force the
+ socket to use IPv4 or IPv6. If not set it will be determined
+ from host (defaults to AF_UNSPEC).
- flags is a bitmask for getaddrinfo().
+ flags is a bitmask for getaddrinfo().
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
- reuse_address tells the kernel to reuse a local socket in
- TIME_WAIT state, without waiting for its natural timeout to
- expire. If not specified will automatically be set to True on
- UNIX.
- """
- raise NotImplementedError
+ reuse_address tells the kernel to reuse a local socket in
+ TIME_WAIT state, without waiting for its natural timeout to
+ expire. If not specified will automatically be set to True on
+ UNIX.
+ """
+ raise NotImplementedError
- def create_unix_connection(self, protocol_factory, path,
- ssl=None, sock=None,
- server_hostname=None):
- raise NotImplementedError
+ def create_unix_connection(self, protocol_factory, path,
+ ssl=None, sock=None,
+ server_hostname=None):
+ raise NotImplementedError
- def create_unix_server(self, protocol_factory, path,
- sock=None, backlog=100, ssl=None):
- """A coroutine which creates a UNIX Domain Socket server.
+ def create_unix_server(self, protocol_factory, path,
+ sock=None, backlog=100, ssl=None):
+ """A coroutine which creates a UNIX Domain Socket server.
- The return value is a Server object, which can be used to stop
- the service.
+ The return value is a Server object, which can be used to stop
+ the service.
- path is a str, representing a file systsem path to bind the
- server socket to.
+ path is a str, representing a file systsem path to bind the
+ server socket to.
- sock can optionally be specified in order to use a preexisting
- socket object.
+ sock can optionally be specified in order to use a preexisting
+ socket object.
- backlog is the maximum number of queued connections passed to
- listen() (defaults to 100).
+ backlog is the maximum number of queued connections passed to
+ listen() (defaults to 100).
- ssl can be set to an SSLContext to enable SSL over the
- accepted connections.
- """
- raise NotImplementedError
+ ssl can be set to an SSLContext to enable SSL over the
+ accepted connections.
+ """
+ raise NotImplementedError
- def create_datagram_endpoint(self, protocol_factory,
- local_addr=None, remote_addr=None,
- family=0, proto=0, flags=0):
- raise NotImplementedError
+ def create_datagram_endpoint(self, protocol_factory,
+ local_addr=None, remote_addr=None,
+ family=0, proto=0, flags=0):
+ raise NotImplementedError
- # Pipes and subprocesses.
+ # Pipes and subprocesses.
- def connect_read_pipe(self, protocol_factory, pipe):
- """Register read pipe in event loop. Set the pipe to non-blocking mode.
+ def connect_read_pipe(self, protocol_factory, pipe):
+ """Register read pipe in event loop. Set the pipe to non-blocking mode.
- protocol_factory should instantiate object with Protocol interface.
- pipe is a file-like object.
- Return pair (transport, protocol), where transport supports the
- ReadTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ protocol_factory should instantiate object with Protocol interface.
+ pipe is a file-like object.
+ Return pair (transport, protocol), where transport supports the
+ ReadTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def connect_write_pipe(self, protocol_factory, pipe):
- """Register write pipe in event loop.
+ def connect_write_pipe(self, protocol_factory, pipe):
+ """Register write pipe in event loop.
- protocol_factory should instantiate object with BaseProtocol interface.
- Pipe is file-like object already switched to nonblocking.
- Return pair (transport, protocol), where transport support
- WriteTransport interface."""
- # The reason to accept file-like object instead of just file descriptor
- # is: we need to own pipe and close it at transport finishing
- # Can got complicated errors if pass f.fileno(),
- # close fd in pipe transport then close f and vise versa.
- raise NotImplementedError
+ protocol_factory should instantiate object with BaseProtocol interface.
+ Pipe is file-like object already switched to nonblocking.
+ Return pair (transport, protocol), where transport support
+ WriteTransport interface."""
+ # The reason to accept file-like object instead of just file descriptor
+ # is: we need to own pipe and close it at transport finishing
+ # Can got complicated errors if pass f.fileno(),
+ # close fd in pipe transport then close f and vise versa.
+ raise NotImplementedError
- def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
- stdout=subprocess.PIPE, stderr=subprocess.PIPE,
- **kwargs):
- raise NotImplementedError
+ def subprocess_shell(self, protocol_factory, cmd, stdin=subprocess.PIPE,
+ stdout=subprocess.PIPE, stderr=subprocess.PIPE,
+ **kwargs):
+ raise NotImplementedError
- def subprocess_exec(self, protocol_factory, *args, **kwargs):
- raise NotImplementedError
+ def subprocess_exec(self, protocol_factory, *args, **kwargs):
+ raise NotImplementedError
- # Ready-based callback registration methods.
- # The add_*() methods return None.
- # The remove_*() methods return True if something was removed,
- # False if there was nothing to delete.
+ # Ready-based callback registration methods.
+ # The add_*() methods return None.
+ # The remove_*() methods return True if something was removed,
+ # False if there was nothing to delete.
- def add_reader(self, fd, callback, *args):
- raise NotImplementedError
+ def add_reader(self, fd, callback, *args):
+ raise NotImplementedError
- def remove_reader(self, fd):
- raise NotImplementedError
+ def remove_reader(self, fd):
+ raise NotImplementedError
- def add_writer(self, fd, callback, *args):
- raise NotImplementedError
+ def add_writer(self, fd, callback, *args):
+ raise NotImplementedError
- def remove_writer(self, fd):
- raise NotImplementedError
+ def remove_writer(self, fd):
+ raise NotImplementedError
- # Completion based I/O methods returning Futures.
+ # Completion based I/O methods returning Futures.
- def sock_recv(self, sock, nbytes):
- raise NotImplementedError
+ def sock_recv(self, sock, nbytes):
+ raise NotImplementedError
- def sock_sendall(self, sock, data):
- raise NotImplementedError
+ def sock_sendall(self, sock, data):
+ raise NotImplementedError
- def sock_connect(self, sock, address):
- raise NotImplementedError
+ def sock_connect(self, sock, address):
+ raise NotImplementedError
- def sock_accept(self, sock):
- raise NotImplementedError
+ def sock_accept(self, sock):
+ raise NotImplementedError
- # Signal handling.
+ # Signal handling.
- def add_signal_handler(self, sig, callback, *args):
- raise NotImplementedError
+ def add_signal_handler(self, sig, callback, *args):
+ raise NotImplementedError
- def remove_signal_handler(self, sig):
- raise NotImplementedError
+ def remove_signal_handler(self, sig):
+ raise NotImplementedError
- # Error handlers.
+ # Error handlers.
- def set_exception_handler(self, handler):
- raise NotImplementedError
+ def set_exception_handler(self, handler):
+ raise NotImplementedError
- def default_exception_handler(self, context):
- raise NotImplementedError
+ def default_exception_handler(self, context):
+ raise NotImplementedError
- def call_exception_handler(self, context):
- raise NotImplementedError
+ def call_exception_handler(self, context):
+ raise NotImplementedError
- # Debug flag management.
+ # Debug flag management.
- def get_debug(self):
- raise NotImplementedError
+ def get_debug(self):
+ raise NotImplementedError
- def set_debug(self, enabled):
- raise NotImplementedError
+ def set_debug(self, enabled):
+ raise NotImplementedError
class AbstractEventLoopPolicy(object):
diff -r 1fd4ec764042 -r e637d42d59b1 trollius/tasks.py
--- a/trollius/tasks.py Wed Jun 11 10:34:18 2014 +0200
+++ b/trollius/tasks.py Wed Jun 11 17:29:29 2014 +0200
@@ -16,7 +16,12 @@ try:
except ImportError:
# Python 2.6
from .py27_weakrefset import WeakSet
+try:
+ import asyncio
+except ImportError:
+ asyncio = None
+from . import compat
from . import events
from . import executor
from . import futures
@@ -25,6 +30,13 @@ from .coroutines import Return, From, co
from . import coroutines
+if asyncio is not None:
+ # Accept also asyncio Future objects for interoperability
+ _FUTURE_CLASSES = (futures.Future, asyncio.Future)
+else:
+ _FUTURE_CLASSES = futures.Future
+
+
@coroutine
def _lock_coroutine(lock):
yield From(lock.acquire())
@@ -221,11 +233,17 @@ class Task(futures.Future):
result = coro.send(value)
else:
result = next(coro)
- except Return as exc:
- exc.raised = True
- self.set_result(exc.value)
- except StopIteration:
- self.set_result(None)
+ except StopIteration as exc:
+ if compat.PY33:
+ # asyncio Task object? get the result of the coroutine
+ result = exc.value
+ else:
+ if isinstance(exc, Return):
+ exc.raised = True
+ result = exc.value
+ else:
+ result = None
+ self.set_result(result)
except futures.CancelledError as exc:
super(Task, self).cancel() # I.e., Future.cancel(self).
except Exception as exc:
@@ -252,7 +270,7 @@ class Task(futures.Future):
coro = _lock_coroutine(result)
result = Task(coro, loop=self._loop)
- if isinstance(result, futures.Future):
+ if isinstance(result, _FUTURE_CLASSES):
# Yielded Future must come from Future.__iter__().
result.add_done_callback(self._wakeup)
self._fut_waiter = result