Hey all!
This is my first post to this group. I'm excited about Tulip in general,
and especially how you can write async code with coroutines. I've used the
NDB library for App Engine a lot. I'm trying to fit my understanding on to
Tulip.
One behavior I noticed was if you use asyncio.async(), you also need to
yield from asyncio.sleep(0) to get the "fast dispatch" behavior I'm used to
from NDB. What I want is to start running a sub-task's eventloop until its
first yield, and *then* return the future for that task to the caller.
This behavior has been great for async RPC systems where getting the RPC
packed up and ready for the wire is not I/O bound. Instead the RPC client
just grabs a mutex and puts the packed-up data on a queue for another
thread.
To reproduce the behavior I want to see, I wrote a silly "greatest common
denominator" program using Tulip. Here's a snippet from the program I've
attached that demonstrates fast dispatch:
@asyncio.coroutine
def int_divide(numerator, denominator):
print('int_divide stared: %r %r' % (numerator, denominator))
divide = '%d/%d' % (numerator, denominator)
quotient_future = asyncio.async(bc_child(divide))
mod = '%d%%%d' % (numerator, denominator)
mod_future = asyncio.async(bc_child(mod))
# bc_child calls above won't run until we yield to the event loop
yield from asyncio.sleep(0)
print('Waiting on int_divide futures')
quotient = yield from quotient_future
remainder = yield from mod_future
return int(quotient), int(remainder)
With the yield from asyncio.sleep(0) line present you see this output:
gcd started: 1238482 410
int_divide stared: 1238482 410
bc_child child stared: '1238482/410'
run_child stared: 'bc'
bc_child child stared: '1238482%410'
run_child stared: 'bc'
Waiting on int_divide futures << important line
1238482 / 410 = 3020 remainder 282
With the yield from asyncio.sleep(0) line commented out, you see this
output:
gcd started: 1238482 410
int_divide stared: 1238482 410
Waiting on int_divide futures << important line
bc_child child stared: '1238482/410'
run_child stared: 'bc'
bc_child child stared: '1238482%410'
run_child stared: 'bc'
1238482 / 410 = 3020 remainder 282
Note how the location of "Waiting on int_divide futures" moves to the top
when the sleep is commented out. That's the slow dispatch case. It means
none of the sub-task event loops start until I wait on at least one of
their futures.
Imagine I want to fire off two async RPCs and then do a bunch of
computation and then wait for both RPCs to finish. Without the sleep() call
my computation would delay the RPCs from ever starting. With the sleep call
these fast dispatch RPCs will work.
What's the best practice here? Am I doing it wrong? Doing explicit sleep()
calls seems really gross. My ideal would be a decorator to cause this fast
dispatch behavior, but then again I noticed that the @task decorator is now
gone.
Anyways, thanks in advance for the help!
-Brett
#!/usr/bin/env python3
#
# Copyright 2013 Brett Slatkin
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import asyncio
class Child(asyncio.SubprocessProtocol):
def __init__(self):
self.exited = asyncio.Future()
self.data = []
def pipe_data_received(self, fd, data):
self.data.append(data)
def process_exited(self):
self.exited.set_result(None)
@asyncio.coroutine
def run_child(*args, input_data=None):
print('run_child stared: %r' % args)
loop = asyncio.get_event_loop()
transport, protocol = yield from loop.subprocess_exec(Child, *args)
if input_data:
stdin = transport.get_pipe_transport(asyncio.STDIN)
stdin.write(input_data.encode('utf-8')) # Async and non-blocking via eventloop
stdin.close() # Will happen after async write finishes
yield from protocol.exited # This is annoying
return transport.get_returncode(), b''.join(protocol.data).decode('utf-8')
@asyncio.coroutine
def bc_child(bc_command):
print('bc_child child stared: %r' % bc_command)
status, data = yield from run_child('bc', input_data=bc_command + '\n')
if status or 'parse error' in data:
raise Exception('Command %s failed with status %d:' % (
bc_command, status, data))
return data
@asyncio.coroutine
def int_divide(numerator, denominator):
print('int_divide stared: %r %r' % (numerator, denominator))
divide = '%d/%d' % (numerator, denominator)
quotient_future = asyncio.async(bc_child(divide))
mod = '%d%%%d' % (numerator, denominator)
mod_future = asyncio.async(bc_child(mod))
# bc_child calls above won't run until we yield to the event loop
yield from asyncio.sleep(0)
print('Waiting on int_divide futures')
quotient = yield from quotient_future
remainder = yield from mod_future
return int(quotient), int(remainder)
@asyncio.coroutine
def gcd(a, b):
print('gcd started: %r %r' % (a, b))
real_a = max(a, b)
real_b = min(a, b)
quotient, remainder = yield from int_divide(real_a, real_b)
print('%d / %d = %d remainder %d' % (real_a, real_b, quotient, remainder))
if remainder == 0:
return real_b
return (yield from gcd(real_b, remainder))
loop = asyncio.get_event_loop()
future = gcd(1238482, 410)
result = loop.run_until_complete(future)
print('Result', result)