Hello, I've been playing with the new tubes that are being implemented: http://comments.gmane.org/gmane.comp.python.twisted/27248 https://twistedmatrix.com/trac/ticket/1956
Here are few things that I did with it. I won't publish the full code now, as in it's current shape it could implode eyeballs of twisted devs and possibly make them summon some of the elder gods, but I'll see if I can produce something less vile as I merge the ongoing changes to the tubes branch. So far I wrote relatively simple app that read logfiles, parse them and insert what they got out of them into a database. First issue that I've dealt with is stopping the tubes. When I read the whole of the input I want to wait until all of it was parsed (atm synchronous code, but I can imagine eg. some expensive processing being done in thread / external process) and then wait until it's commited to the database before shutting the reactor down cleanly. As of #42908 which I pulled for experimenting the support for passing flowStopped(reason) through pipeline (or series if you want) was not working, an issue with None being returned from stopped() ended the processing prematurely, which I fixed with: === modified file 'tubes7/tube.py' --- tubes7/tube.py 2014-08-01 18:32:48 +0000 +++ tubes7/tube.py 2014-08-01 21:20:44 +0000 @@ -441,6 +446,8 @@ downstream.flowStopped(f) return if iterableOrNot is None: + if self._flowStoppingReason is not None: + self._tfount.drain.flowStopped(self._flowStoppingReason) return 0 self._pendingIterator = iter(iterableOrNot) if self._tfount.drain is None: Also the ProtocolFount didn't really do what it should, so I made it implement IHalfCloseableProtocol and made it call flowStopped() accordingly. One more thing about it I did is that I made it invoke flowStopped() on any drain that is newly attached to it - apparently when I used the stdio endpoint it managed to close it when reading from /dev/null even before I managed to set up the series/pipeline. That still didn't make it possible for me to wait on DB being written to properly. What I had to do is to implement CloseableDrain that has waitDone() method that emits a Deferred that fires when the drain's flowStopped() was called and all it should do has been done. This makes it quite handy to use from react()-style setup since I can just return this Deferred, or DeferredList of all ongoing pipelines. For the next pipeline I had one more issue: this pipeline can be run either as a log reader, or as essential part of running program that emits such logs. In the latter case I need to generate confirmation messages for specific entries that are being inserted and send them back to the originator, after they has been safely written to the DB. This I resolved by adding another field into the values I pass into PostgreSQLDrain - deferred that will be fired as txpostgres's runOperation finishes. This resolution works pretty well but it took me quite a while to come up with it, so I'm not sure if it's intuitive design pattern or if we could come up with something better. Then I had to run both pipelines in parallel, after implementing the fan-in pattern (fan-out was already done by glyph), I wrote this helper function: def parallel(*tubes): out = Out() in_ = In() out._drain.nextFount = in_ for tube in tubes: out.newFount().flowTo(series(tube, in_.newDrain())) in_.stop_on_empty = True return out.drain The nextFount attribute on _OutDrain is what is returned from flowingFrom() so this function can be used as a part of series. What I'm unsure about is how to handle stopping of the fan-in. Currently I don't make it stop until the stop_on_empty is set (so I can add/remove things during it's initialization) and then I make it stop when the last fount that's flowing in has stopped (and removed from input founts set) and I use the reason it passes into flowStopped() to propagate along to the rest of series, effectively discarding any reason objects passed to all the founts except the last one. What I'll have to deal with is a lack of sensible flow control in some parts of the code. For example the part that generates the log files should not be stopped just because there's some delay in writing the logs. This made me wonder if the flow control and perhaps processing confirmation should not be run not as a part of the main interface but instead something that runs alongside, where applicable, in the opposite direction. But I don't have any specific API in my mind at the moment. On the other hand, both are perfectly solvable with current design - implementing FIFO buffers or message droppers for flow control and the above mentioned deferred passing for confirmations. As for data representation that I choose to pass between each tube I've started with simple namedtuples and following that I've built a simple "datatype" class somewhat reminiscent of https://github.com/hynek/characteristic which I learned of few moments after I finished polishing my own implementation. What I have there is added layer above namedtuples that autogenerate zope Interfaces (so I can have adaptation), do field type and value validation/adaptation and possibly (as a future extension) provide easy way to make them into AMP commands so the series can be split into communicating processes as needed. (What would be interesting imo is something like ampoule for tubes, or perhaps a ThreadTube and SubprocessTube for performing blocking operations) Also maybe of note is the implementation of Pipes in Async library for OCaml which I've been examining lately. What they seem to do there is that they push values downstream and the function called in each processing step may return deferred signifying a pause is requested until this deferred is fired. For those interested in the details you can refer to: https://ocaml.janestreet.com/ocaml-core/111.25.00/doc/async/#Std.Pipe and the relevant section of Real World OCaml book (available online). Looking forward to further tubes development :-) CcxCZ (freenode) | Jan Pobříslo (IRL) _______________________________________________ Twisted-Python mailing list Twisted-Python@twistedmatrix.com http://twistedmatrix.com/cgi-bin/mailman/listinfo/twisted-python