Richard Purdie <[email protected]> escreveu no dia sábado,
16/04/2022 à(s) 22:57:

> On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> > for the FetchConnectionCache use a queue where each thread can
> > get an unsed connection_cache that is properly initialized before
> > we fireup the ThreadPoolExecutor.
> >
> > for the progress bar we need an adictional task counter
> > that is protected with thread lock as it runs inside the
> > ThreadPoolExecutor.
> >
> > Fixes [YOCTO #14775] --
> https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
> >
> > Signed-off-by: Jose Quaresma <[email protected]>
> > ---
> >  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
> >  1 file changed, 28 insertions(+), 16 deletions(-)
>
> Are there specific issues you see with oe.utils.ThreadedPool that this
> change
> addresses? Were you able to reproduce the issue in 14775?
>

Looking deeper while testing the patch I think I found another bug in the
sstate mirror handling.
The python set() is not thread safe and we use it inside the thread pool, I
added a new python set class for that in my V2

I don't know if it is related to 14775 but it can be, I can't reproduce the
14775 on my side,
maybe it's better to remove the 14775 mention from my commits, what do you
think?


> I'm a little concerned we swap one implementation where we know roughly
> what the
> issues are for another where we dont :/.
>

I think there some issues on ThreadedPool in the worker_init and worker_end,
this functions is called in all workers and it seems to me that the
right thing to do
is calling and reuse the previous ones connection_cache otherwise the
connection_cache
does nothing.


>
> I notice that ThreadPoolExecutor can take an initializer but you're doing
> this
> using the queue instead. Is that because you suspect some issue with those
> being
> setup in the separate threads?
>

I am using a queue as it is the easy way I find for reusing the
FetchConnectionCache.


>
> You also mentioned the debug messages not showing. That suggests something
> is
> wrong with the event handlers in the new threading model and that errors
> wouldn't propagate either so we need to check into that.
>

This is fixed in V2


>
> This is definitely an interesting idea but I'm nervous about it :/.
>

It would be interesting if you could test it in the autobuilder.
On my side it is working well now, I will send a V2

Jose


>
> Cheers,
>
> Richard
>
>
>
> > diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> > index 1c0cae4893..0ede078770 100644
> > --- a/meta/classes/sstate.bbclass
> > +++ b/meta/classes/sstate.bbclass
> > @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >              localdata.delVar('BB_NO_NETWORK')
> >
> >          from bb.fetch2 import FetchConnectionCache
> > -        def checkstatus_init(thread_worker):
> > -            thread_worker.connection_cache = FetchConnectionCache()
> > -
> > -        def checkstatus_end(thread_worker):
> > -            thread_worker.connection_cache.close_connections()
> > -
> > -        def checkstatus(thread_worker, arg):
> > +        def checkstatus_init():
> > +            while not connection_cache_pool.full():
> > +                connection_cache_pool.put(FetchConnectionCache())
> > +
> > +        def checkstatus_end():
> > +            while not connection_cache_pool.empty():
> > +                connection_cache = connection_cache_pool.get()
> > +                connection_cache.close_connections()
> > +
> > +        import threading
> > +        _lock = threading.Lock()
> > +        def checkstatus(arg):
> >              (tid, sstatefile) = arg
> >
> > +            connection_cache = connection_cache_pool.get()
> > +
> >              localdata2 = bb.data.createCopy(localdata)
> >              srcuri = "file://" + sstatefile
> >              localdata2.setVar('SRC_URI', srcuri)
> > @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >
> >              try:
> >                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> > -
> connection_cache=thread_worker.connection_cache)
> > +                            connection_cache=connection_cache)
> >                  fetcher.checkstatus()
> >                  bb.debug(2, "SState: Successful fetch test for %s" %
> srcuri)
> >                  found.add(tid)
> > @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >              except Exception as e:
> >                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri,
> repr(e), traceback.format_exc()))
> >
> > +            connection_cache_pool.put(connection_cache)
> > +
> >              if progress:
> > -                bb.event.fire(bb.event.ProcessProgress(msg,
> len(tasklist) - thread_worker.tasks.qsize()), d)
> > +                with _lock:
> > +                    tasks -= 1
> > +                bb.event.fire(bb.event.ProcessProgress(msg,
> len(tasklist) - tasks), d)
> >
> >          tasklist = []
> >          for tid in missed:
> > @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> >          if tasklist:
> >              nproc = min(int(d.getVar("BB_NUMBER_THREADS")),
> len(tasklist))
> >
> > +            tasks = len(tasklist)
> >              progress = len(tasklist) >= 100
> >              if progress:
> >                  msg = "Checking sstate mirror object availability"
> > @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d,
> siginfo=False, currentcount=0, summary=True,
> >              fetcherenv = bb.fetch2.get_fetcher_environment(d)
> >              with bb.utils.environment(**fetcherenv):
> >                  bb.event.enable_threadlock()
> > -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> > -                        worker_init=checkstatus_init,
> worker_end=checkstatus_end,
> > -                        name="sstate_checkhashes-")
> > -                for t in tasklist:
> > -                    pool.add_task(checkstatus, t)
> > -                pool.start()
> > -                pool.wait_completion()
> > +                import concurrent.futures
> > +                from queue import Queue
> > +                connection_cache_pool = Queue(nproc)
> > +                checkstatus_init()
> > +                with
> concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> > +                    executor.map(checkstatus, tasklist)
> > +                checkstatus_end()
> >                  bb.event.disable_threadlock()
> >
> >              if progress:
> > 
> >
>
>
>

-- 
Best regards,

José Quaresma
-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#164562): 
https://lists.openembedded.org/g/openembedded-core/message/164562
Mute This Topic: https://lists.openembedded.org/mt/90512350/21656
Group Owner: [email protected]
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[[email protected]]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to