I met the following traceback:
Traceback (most recent call last):
File "/usr/lib/pypy3/lib-python/3/concurrent/futures/process.py", line
102, in _python_exit
thread_wakeup.wakeup()
File "/usr/lib/pypy3/lib-python/3/concurrent/futures/process.py", line
90, in wakeup
self._writer.send_bytes(b"")
File "/usr/lib/pypy3/lib-python/3/multiprocessing/connection.py", line
183, in send_bytes
self._check_closed()
File "/usr/lib/pypy3/lib-python/3/multiprocessing/connection.py", line
136, in _check_closed
raise OSError("handle is closed")
OSError: handle is closed
debug: OperationError:
debug: operror-type: OSError
debug: operror-value: handle is closed
The code is:
@pytest.mark.asyncio
async def test_peace_search_store():
import multiprocessing
from found import pstore
from concurrent import futures
db = await open()
with
futures.ProcessPoolExecutor(max_workers=multiprocessing.cpu_count()) as
pool:
store = pstore.make('test-pstore', (42,), pool)
DOC0 = dict(
foundationdb=1,
okvs=2,
database=42,
)
DOC1 = dict(
sqlite=1,
sql=2,
database=3
)
DOC2 = dict(
spam=42,
)
for uid, doc in enumerate((DOC0, DOC1, DOC2)):
await found.transactional(db, pstore.index, store, uid, doc)
expected = [(0, 1)]
out = await found.transactional(db, pstore.search, store,
["foundationdb"], 10)
assert out == expected
expected = [(2, 42)]
out = await found.transactional(db, pstore.search, store,
["spam"], 10)
assert out == expected
expected = [(0, 42), (1, 3)]
out = await found.transactional(db, pstore.search, store,
["database"], 10)
assert out == expected
Also pstore.search will call the following pool_for_each_par_map:
from aiostream import pipe, stream
import asyncio
async def pool_for_each_par_map(loop, pool, f, p, iterator):
zx = stream.iterate(iterator)
zx = zx | pipe.map(lambda x: loop.run_in_executor(pool, p, x))
async with zx.stream() as streamer:
limit = pool._max_workers
unfinished = []
while True:
tasks = []
for i in range(limit):
try:
task = await streamer.__anext__()
except StopAsyncIteration:
limit = 0
else:
tasks.append(task)
tasks = tasks + list(unfinished)
assert len(tasks) <= pool._max_workers
if not tasks:
break
finished, unfinished = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED
)
for finish in finished:
out = finish.result()
f(out)
limit = len(finished)
You can reproduce with the following cli dance:
git clone --branch=rework
https://github.com/amirouche/asyncio-foundationdb/
cd asyncio-founndationdb
make init
poetry run make check
I am not sure how to proceed I can try nightly builds but which ones?
py3.7 oor py3.8?
Feedback welcome.
_______________________________________________
pypy-dev mailing list
pypy-dev@python.org
https://mail.python.org/mailman/listinfo/pypy-dev