On Fri, Oct 30, 2020 at 6:39 PM Tin Tvrtković <tinches...@gmail.com> wrote:

> A small update on this, since I've been playing with it.
>
> I'm trying to implement a websocket proxy, since it's an example of a toy
> project that needs to juggle two long-lived asyncio connections at once.
> I'm using Starlette/Uvicorn for the server part (the part that accepts the
> connection) and aiohttp's client functionality to connect to an echo server
> on the Internet.
>
> I've settled on the async iterator interface as a good building block for
> this, not queues. Turns out an async iterator looks very much as the read
> interface for a Go channel (they spit out values and signal their closure).
> Aiohttp already provides an async iterator interface for reading from the
> upstream server. Starlette doesn't, but an adapter is very easy to write.
>
> Now that we've settled on a suitable interface, a helper function
> (wait_on) is easy to write. (Code omitted for brevity.) This is the entire
> request handler:
>
> async def ws_proxy(client: WebSocket):
>     await client.accept()
>     async with ClientSession() as session:
>         async with session.ws_connect("wss://echo.websocket.org") as s:
>             c = starlette_websocket_iterator(client)
>             async for r in wait_on(c, s):
>                 match r:
>                     case (src, None):
>                         print(f"{src} closed the connection")
>                         break
>                     case (src, msg) if src is c:
>                         print(f"CLIENT: {msg}")
>                         await s.send_str(msg)
>                     case (src, msg) if src is s:
>                         print(f"SERVER: {msg}")
>                         await client.send_text(msg.data)
>

To compare against the non-match approach:

async for src, msg in wait_on(c, s):
    if msg is None:
        print(f"{src} closed the connection")
        break
    elif src is c:
        print(f"CLIENT: {msg}")
        await s.send_str(msg)
    elif src is s:
        print(f"SERVER: {msg}")
        await client.send_text(msg.data)

-Brett


>
> So yeah, the helper yields tuples of the source and message, using None as
> a sentinel for closure. Guards are used to match on the source, using
> iterator identity. My first version just used `case (s, msg):` hoping to
> match on the identity of s, but that doesn't work since s is not a literal.
>
> I'd say this is pretty cool. With the ecosystem moving to async iterators
> for streams of data (or just writing adapters), I'd say this style of
> programming is quite readable and understandable.
>
> On Tue, Oct 27, 2020 at 1:16 AM Tin Tvrtković <tinches...@gmail.com>
> wrote:
>
>> Hello,
>>
>> Go channels are indeed very similar to asyncio Queues, with some added
>> features like channels being closable. (There is also special syntax in the
>> select statement, `val, ok <- chan`, that will set the `ok` variable to
>> false if the channel has been closed.) A larger difference, I think, is
>> that in Go channels are used practically everywhere, more so than asyncio
>> Queues. They are an abstraction the vast majority of Go concurrency is
>> built upon.
>>
>> Building this for asyncio tasks, instead of just queues, would be much
>> more useful in Python.
>>
>> Contemplating this some more, I would agree we don't need an async match.
>> A function and some types to match on would probably be enough to get us
>> close to a select statement in a PEP634 Python. I guess the challenge is
>> designing these matchable types for ease of use now, and I need to study
>> the pattern matching PEPs in more detail to be able to contribute here.
>>
>> On one hand, this means this problem can be solved by a third party
>> library. On the other hand, I feel like this would be very useful so it
>> might be worth it to have it somewhere in the stdlib asyncio namespace.
>>
>> Since `asyncio.wait` can yield multiple tasks in the completed set, this
>> would probably have to be wrapped in an `async for`.
>>
>>
>>
>> On Mon, Oct 26, 2020 at 12:33 PM Gustavo Carneiro <gjcarne...@gmail.com>
>> wrote:
>>
>>> It's true that asyncio.wait provides the tools that you need, but it's a
>>> bit clunky to use correctly.
>>>
>>> Maybe would be something along the lines of:
>>>
>>> ------
>>> queue1 = asyncio.Queue()
>>> queue2 = asyncio.Queue()
>>> ...
>>> get1 = asyncio.create_task(queue1.get())
>>> get2 = asyncio.create_task(queue2.get())
>>> await asyncio.wait({get1, get2}, return_when=asyncio.FIRST_COMPLETED)
>>> match [task.done() for task in (get1, get2)]:
>>>     case [True, False]:  get2.cancel(); item1 = await get1; ....
>>>     case [False, True]:  get1.cancel(); item2 = await get2; ....
>>>     case [True, True]:  item1 = await get1; ....; item2 = await get2;
>>> ....
>>> ------
>>>
>>> If asyncio.Queue() is the equivalent of Go channels, perhaps it would be
>>> worth designing a new API for asyncio.Queue, one that is better suited to
>>> the match statement:
>>>
>>> class Queue:
>>>    async def read_wait(self) -> 'Queue':
>>>        """
>>>        Waits until the queue has at least one item ready to read,
>>> without actually consuming the item.
>>>        """
>>>
>>> Then we could more easily use match statement with multiple queues, thus:
>>>
>>> ------
>>> async def ready_queue(*queues: asyncio.Queue) -> asyncio.Queue:
>>>    """
>>>    Take multiple queue parameters and waits for at least one of them to
>>> have items pending to read, returning that queue.
>>>    """
>>>    await asyncio.wait({queue.read_wait() for queue in queues},
>>> return_when=asyncio.FIRST_COMPLETED)
>>>    for queue in queues:
>>>       if queue.qsize() > 0:
>>>           return queue
>>>
>>> ...
>>>
>>> queue1 = asyncio.Queue()
>>> queue2 = asyncio.Queue()
>>>
>>> ...
>>>
>>> match await ready_queue(queue1, queue2):
>>>     case queue1:  item1 = queue1.get_nowait(); ....
>>>     case queue2:  item2 = queue2.get_nowait(); ....
>>> ------
>>>
>>> Which is less clunky, maybe?...
>>>
>>> The above is not 100% bug free.  I think those queue.get_nowait() calls
>>> may still end up raising QueueEmpty exceptions, in case there is another
>>> concurrent reader for those queues.  This code would need more work, most
>>> likely.
>>>
>>> In any case, perhaps it's not the match statement that needs to change,
>>> but rather asyncio API that needs to be enhanced.
>>>
>>>
>>> On Sun, 25 Oct 2020 at 01:14, Nick Coghlan <ncogh...@gmail.com> wrote:
>>>
>>>> On Sat., 24 Oct. 2020, 4:21 am Guido van Rossum, <gu...@python.org>
>>>> wrote:
>>>>
>>>>> On Fri, Oct 23, 2020 at 6:19 AM Tin Tvrtković <tinches...@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> first of all, I'm a big fan of the changes being proposed here since
>>>>>> in my code I prefer the 'union' style of logic over the OO style.
>>>>>>
>>>>>> I was curious, though, if there are any plans for the match operator
>>>>>> to support async stuff. I'm interested in the problem of waiting on
>>>>>> multiple asyncio tasks concurrently, and having a branch of code execute
>>>>>> depending on the task.
>>>>>>
>>>>>> Currently this can be done by using asyncio.wait, looping over the
>>>>>> done set and executing an if-else chain there, but this is quite 
>>>>>> tiresome.
>>>>>> Go has a select statement (https://tour.golang.org/concurrency/5)
>>>>>> that looks like this:
>>>>>>
>>>>>> select {
>>>>>> case <-ch1:
>>>>>>     fmt.Println("Received from ch1")
>>>>>> case <-ch2:
>>>>>>     fmt.Println("Received from ch2")
>>>>>> }
>>>>>>
>>>>>> Speaking personally, this is a Go feature I miss a lot when writing
>>>>>> asyncio code. The syntax is similar to what's being proposed here. 
>>>>>> Although
>>>>>> it could be a separate thing added later, async match, I guess.
>>>>>>
>>>>>
>>>>> Hadn't seen this before. You could propose this as a follow-up for
>>>>> 3.11. But aren't Go channels more like asyncio Queues? I guess we'd need
>>>>> way more in terms of a worked-out example (using asyncio code, not Go 
>>>>> code).
>>>>>
>>>>
>>>> I think we'd also want to see how far folks get with using guard
>>>> clauses for this kind of "where did the data come from?" check - the only
>>>> specifically asynchronous bit would be the "await multiple tasks"
>>>> operation, and you can already tell asyncio.wait() to return on the first
>>>> completed task rather than waiting for all the results.
>>>>
>>>> Cheers,
>>>> Nick.
>>>>
>>>>
>>>>
>>>>> _______________________________________________
>>>> Python-Dev mailing list -- python-dev@python.org
>>>> To unsubscribe send an email to python-dev-le...@python.org
>>>> https://mail.python.org/mailman3/lists/python-dev.python.org/
>>>> Message archived at
>>>> https://mail.python.org/archives/list/python-dev@python.org/message/NQWYLFLGLLCEHAXYHUOXQ3M7IOEL65ET/
>>>> Code of Conduct: http://python.org/psf/codeofconduct/
>>>>
>>>
>>>
>>> --
>>> Gustavo J. A. M. Carneiro
>>> Gambit Research
>>> "The universe is always one step beyond logic." -- Frank Herbert
>>>
>> _______________________________________________
> Python-Dev mailing list -- python-dev@python.org
> To unsubscribe send an email to python-dev-le...@python.org
> https://mail.python.org/mailman3/lists/python-dev.python.org/
> Message archived at
> https://mail.python.org/archives/list/python-dev@python.org/message/BJYJNK5NPHH3ZAPER4NWQ6Q4OWDYVYFH/
> Code of Conduct: http://python.org/psf/codeofconduct/
>
_______________________________________________
Python-Dev mailing list -- python-dev@python.org
To unsubscribe send an email to python-dev-le...@python.org
https://mail.python.org/mailman3/lists/python-dev.python.org/
Message archived at 
https://mail.python.org/archives/list/python-dev@python.org/message/XPFBW4UZ7KRUDZTZOWX2RLX6SXWPT6GN/
Code of Conduct: http://python.org/psf/codeofconduct/

Reply via email to