Hi Guido,
> I am still wondering, what is your actual use case? Multiple coroutines
> writing to the same stream sounds like the exception, not the common case
> -- even when you pay careful attention to the appearance of yield-from, you
> still have the problem that you can't control which of several coroutines
> runs first. I must assume that you have a protocol over TCP/IP that has
> some kind of framing convention, and you always ensure that a coroutine
> writes an entire frame without yielding.
>
I have a small application that monitors our servers here, and sends the
results of this monitoring to a single socket for analysis. The
analysis tool runs outside of the server farm and is not as well connected
as the servers inside the farm are amongst each other. The analysis
tool is supposed to log into the monitoring application, tell which
parameters it intends to monitor, and then gets those parameters sent over
that single socket using a framed protocol.
Now, for each parameter that I monitor, I write a coroutine (those are
typically very small, ten lines on average I guess). That coroutine
more-or-less does
while True:
data = yield from get_monitored_parameter()
writer.write(header)
writer.write(data)
yield from writer.drain()
I wrote my own drain coroutine (mostly like the locks you proposed), so
that I can call it from
several tasks, and this gives me a simple flow control. It could be a good
idea to add such
locks to to the standard library drain, but I can also see if people think
that this is to special
a use case. I added a working patch to this post just in case.
I wonder if you could just a bare Transport/Protocol pair instead of a
> StreamWriter? In that case you'd have to implement pause_writing() and
> resume_writing() in your Protocol class if you want to have the equivalent
> of drain(). (But do you? Is there nothing at a higher level in your
> protocol to prevent your writing coroutines from overwhelming the other end
> of the socket?)
>
I'm using StreamWriter because this is what start_server returns... I want
to keep things simple, my
code is currently 345 lines and works. I think the write-drain-loop
approach is a flow control concept
really easy to grasp, that's why I went for this solution.
Sure, I could write some handshaking to avoid overwhelming the other end of
the socket, but
TCP already does that for me, so why bother.
Greetings
Martin
My promised patch follows:
--- streams2.py 2015-06-11 16:49:14.851855051 +0200
+++ streams.py 2015-06-15 11:04:35.119700288 +0200
@@ -13,6 +13,7 @@
from . import coroutines
from . import events
from . import futures
+from . import locks
from . import protocols
from .coroutines import coroutine
from .log import logger
@@ -151,6 +152,7 @@
self._loop = loop
self._paused = False
self._drain_waiter = None
+ self._drain_lock = locks.Lock(loop=self._loop)
self._connection_lost = False
def pause_writing(self):
@@ -191,13 +193,12 @@
def _drain_helper(self):
if self._connection_lost:
raise ConnectionResetError('Connection lost')
- if not self._paused:
- return
- waiter = self._drain_waiter
- assert waiter is None or waiter.cancelled()
- waiter = futures.Future(loop=self._loop)
- self._drain_waiter = waiter
- yield from waiter
+ with (yield from self._drain_lock):
+ if not self._paused:
+ return
+ waiter = futures.Future(loop=self._loop)
+ self._drain_waiter = waiter
+ yield from waiter
class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):