Richard Purdie <[email protected]> escreveu no dia sábado, 16/04/2022 à(s) 22:57:
> On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote: > > for the FetchConnectionCache use a queue where each thread can > > get an unsed connection_cache that is properly initialized before > > we fireup the ThreadPoolExecutor. > > > > for the progress bar we need an adictional task counter > > that is protected with thread lock as it runs inside the > > ThreadPoolExecutor. > > > > Fixes [YOCTO #14775] -- > https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775 > > > > Signed-off-by: Jose Quaresma <[email protected]> > > --- > > meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++-------------- > > 1 file changed, 28 insertions(+), 16 deletions(-) > > Are there specific issues you see with oe.utils.ThreadedPool that this > change > addresses? Were you able to reproduce the issue in 14775? > Looking deeper while testing the patch I think I found another bug in the sstate mirror handling. The python set() is not thread safe and we use it inside the thread pool, I added a new python set class for that in my V2 I don't know if it is related to 14775 but it can be, I can't reproduce the 14775 on my side, maybe it's better to remove the 14775 mention from my commits, what do you think? > I'm a little concerned we swap one implementation where we know roughly > what the > issues are for another where we dont :/. > I think there some issues on ThreadedPool in the worker_init and worker_end, this functions is called in all workers and it seems to me that the right thing to do is calling and reuse the previous ones connection_cache otherwise the connection_cache does nothing. > > I notice that ThreadPoolExecutor can take an initializer but you're doing > this > using the queue instead. Is that because you suspect some issue with those > being > setup in the separate threads? > I am using a queue as it is the easy way I find for reusing the FetchConnectionCache. > > You also mentioned the debug messages not showing. That suggests something > is > wrong with the event handlers in the new threading model and that errors > wouldn't propagate either so we need to check into that. > This is fixed in V2 > > This is definitely an interesting idea but I'm nervous about it :/. > It would be interesting if you could test it in the autobuilder. On my side it is working well now, I will send a V2 Jose > > Cheers, > > Richard > > > > > diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass > > index 1c0cae4893..0ede078770 100644 > > --- a/meta/classes/sstate.bbclass > > +++ b/meta/classes/sstate.bbclass > > @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False, > currentcount=0, summary=True, > > localdata.delVar('BB_NO_NETWORK') > > > > from bb.fetch2 import FetchConnectionCache > > - def checkstatus_init(thread_worker): > > - thread_worker.connection_cache = FetchConnectionCache() > > - > > - def checkstatus_end(thread_worker): > > - thread_worker.connection_cache.close_connections() > > - > > - def checkstatus(thread_worker, arg): > > + def checkstatus_init(): > > + while not connection_cache_pool.full(): > > + connection_cache_pool.put(FetchConnectionCache()) > > + > > + def checkstatus_end(): > > + while not connection_cache_pool.empty(): > > + connection_cache = connection_cache_pool.get() > > + connection_cache.close_connections() > > + > > + import threading > > + _lock = threading.Lock() > > + def checkstatus(arg): > > (tid, sstatefile) = arg > > > > + connection_cache = connection_cache_pool.get() > > + > > localdata2 = bb.data.createCopy(localdata) > > srcuri = "file://" + sstatefile > > localdata2.setVar('SRC_URI', srcuri) > > @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, > currentcount=0, summary=True, > > > > try: > > fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2, > > - > connection_cache=thread_worker.connection_cache) > > + connection_cache=connection_cache) > > fetcher.checkstatus() > > bb.debug(2, "SState: Successful fetch test for %s" % > srcuri) > > found.add(tid) > > @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False, > currentcount=0, summary=True, > > except Exception as e: > > bb.error("SState: cannot test %s: %s\n%s" % (srcuri, > repr(e), traceback.format_exc())) > > > > + connection_cache_pool.put(connection_cache) > > + > > if progress: > > - bb.event.fire(bb.event.ProcessProgress(msg, > len(tasklist) - thread_worker.tasks.qsize()), d) > > + with _lock: > > + tasks -= 1 > > + bb.event.fire(bb.event.ProcessProgress(msg, > len(tasklist) - tasks), d) > > > > tasklist = [] > > for tid in missed: > > @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False, > currentcount=0, summary=True, > > if tasklist: > > nproc = min(int(d.getVar("BB_NUMBER_THREADS")), > len(tasklist)) > > > > + tasks = len(tasklist) > > progress = len(tasklist) >= 100 > > if progress: > > msg = "Checking sstate mirror object availability" > > @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, > siginfo=False, currentcount=0, summary=True, > > fetcherenv = bb.fetch2.get_fetcher_environment(d) > > with bb.utils.environment(**fetcherenv): > > bb.event.enable_threadlock() > > - pool = oe.utils.ThreadedPool(nproc, len(tasklist), > > - worker_init=checkstatus_init, > worker_end=checkstatus_end, > > - name="sstate_checkhashes-") > > - for t in tasklist: > > - pool.add_task(checkstatus, t) > > - pool.start() > > - pool.wait_completion() > > + import concurrent.futures > > + from queue import Queue > > + connection_cache_pool = Queue(nproc) > > + checkstatus_init() > > + with > concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor: > > + executor.map(checkstatus, tasklist) > > + checkstatus_end() > > bb.event.disable_threadlock() > > > > if progress: > > > > > > > -- Best regards, José Quaresma
-=-=-=-=-=-=-=-=-=-=-=- Links: You receive all messages sent to this group. View/Reply Online (#164562): https://lists.openembedded.org/g/openembedded-core/message/164562 Mute This Topic: https://lists.openembedded.org/mt/90512350/21656 Group Owner: [email protected] Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub [[email protected]] -=-=-=-=-=-=-=-=-=-=-=-
