I met the following traceback:

Traceback (most recent call last):
File "/usr/lib/pypy3/lib-python/3/concurrent/futures/process.py", line 102, in _python_exit
    thread_wakeup.wakeup()
File "/usr/lib/pypy3/lib-python/3/concurrent/futures/process.py", line 90, in wakeup
    self._writer.send_bytes(b"")
File "/usr/lib/pypy3/lib-python/3/multiprocessing/connection.py", line 183, in send_bytes
    self._check_closed()
File "/usr/lib/pypy3/lib-python/3/multiprocessing/connection.py", line 136, in _check_closed
    raise OSError("handle is closed")
OSError: handle is closed
debug: OperationError:
debug:  operror-type: OSError
debug:  operror-value: handle is closed


The code is:


@pytest.mark.asyncio
async def test_peace_search_store():
    import multiprocessing
    from found import pstore
    from concurrent import futures

    db = await open()

with futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as pool:
        store = pstore.make('test-pstore', (42,), pool)

        DOC0 = dict(
            foundationdb=1,
            okvs=2,
            database=42,
        )
        DOC1 = dict(
            sqlite=1,
            sql=2,
            database=3
        )
        DOC2 = dict(
            spam=42,
        )
        for uid, doc in enumerate((DOC0, DOC1, DOC2)):
            await found.transactional(db, pstore.index, store, uid, doc)

        expected = [(0, 1)]
out = await found.transactional(db, pstore.search, store, ["foundationdb"], 10)
        assert out == expected

        expected = [(2, 42)]
out = await found.transactional(db, pstore.search, store, ["spam"], 10)
        assert out == expected

        expected = [(0, 42), (1, 3)]
out = await found.transactional(db, pstore.search, store, ["database"], 10)
        assert out == expected


Also pstore.search will call the following pool_for_each_par_map:

from aiostream import pipe, stream
import asyncio


async def pool_for_each_par_map(loop, pool, f, p, iterator):
    zx = stream.iterate(iterator)
    zx = zx | pipe.map(lambda x: loop.run_in_executor(pool, p, x))
    async with zx.stream() as streamer:
        limit = pool._max_workers
        unfinished = []
        while True:
            tasks = []
            for i in range(limit):
                try:
                    task = await streamer.__anext__()
                except StopAsyncIteration:
                    limit = 0
                else:
                    tasks.append(task)
            tasks = tasks + list(unfinished)
            assert len(tasks) <= pool._max_workers
            if not tasks:
                break
            finished, unfinished = await asyncio.wait(
                tasks, return_when=asyncio.FIRST_COMPLETED
            )
            for finish in finished:
                out = finish.result()
                f(out)
            limit = len(finished)


You can reproduce with the following cli dance:

git clone --branch=rework https://github.com/amirouche/asyncio-foundationdb/
  cd asyncio-founndationdb
  make init
  poetry run make check

I am not sure how to proceed I can try nightly builds but which ones? py3.7 oor py3.8?

Feedback welcome.
_______________________________________________
pypy-dev mailing list
pypy-dev@python.org
https://mail.python.org/mailman/listinfo/pypy-dev

Reply via email to