Hi Guido,
 

> I am still wondering, what is your actual use case? Multiple coroutines 
> writing to the same stream sounds like the exception, not the common case 
> -- even when you pay careful attention to the appearance of yield-from, you 
> still have the problem that you can't control which of several coroutines 
> runs first. I must assume that you have a protocol over TCP/IP that has 
> some kind of framing convention, and you always ensure that a coroutine 
> writes an entire frame without yielding.
>

I have a small application that monitors our servers here, and sends the 
results of this monitoring to a single socket for analysis. The
analysis tool runs outside of the server farm and is not as well connected 
as the servers inside the farm are amongst each other. The analysis
tool is supposed to log into the monitoring application, tell which 
parameters it intends to monitor, and then gets those parameters sent over
that single socket using a framed protocol.

Now, for each parameter that I monitor, I write a coroutine (those are 
typically very small, ten lines on average I guess). That coroutine
more-or-less does

    while True:
         data = yield from get_monitored_parameter()
         writer.write(header)
         writer.write(data)
         yield from writer.drain()

I wrote my own drain coroutine (mostly like the locks you proposed), so 
that I can call it from 
several tasks, and this gives me a simple flow control. It could be a good 
idea to add such
locks to to the standard library drain, but I can also see if people think 
that this is to special
a use case. I added a working patch to this post just in case.

I wonder if you could just a bare Transport/Protocol pair instead of a 
> StreamWriter? In that case you'd have to implement pause_writing() and 
> resume_writing() in your Protocol class if you want to have the equivalent 
> of drain(). (But do you? Is there nothing at a higher level in your 
> protocol to prevent your writing coroutines from overwhelming the other end 
> of the socket?)
>

I'm using StreamWriter because this is what start_server returns... I want 
to keep things simple, my
code is currently 345 lines and works. I think the write-drain-loop 
approach is a flow control concept
really easy to grasp, that's why I went for this solution.

Sure, I could write some handshaking to avoid overwhelming the other end of 
the socket, but
TCP already does that for me, so why bother.

Greetings

Martin

My promised patch follows:

--- streams2.py 2015-06-11 16:49:14.851855051 +0200
+++ streams.py  2015-06-15 11:04:35.119700288 +0200
@@ -13,6 +13,7 @@
 from . import coroutines
 from . import events
 from . import futures
+from . import locks
 from . import protocols
 from .coroutines import coroutine
 from .log import logger
@@ -151,6 +152,7 @@
             self._loop = loop
         self._paused = False
         self._drain_waiter = None
+        self._drain_lock = locks.Lock(loop=self._loop)
         self._connection_lost = False
 
     def pause_writing(self):
@@ -191,13 +193,12 @@
     def _drain_helper(self):
         if self._connection_lost:
             raise ConnectionResetError('Connection lost')
-        if not self._paused:
-            return
-        waiter = self._drain_waiter
-        assert waiter is None or waiter.cancelled()
-        waiter = futures.Future(loop=self._loop)
-        self._drain_waiter = waiter
-        yield from waiter
+        with (yield from self._drain_lock):
+            if not self._paused:
+                return
+            waiter = futures.Future(loop=self._loop)
+            self._drain_waiter = waiter
+            yield from waiter
 
 
 class StreamReaderProtocol(FlowControlMixin, protocols.Protocol):

Reply via email to