Hi,

One thing that yet doesn't work and I don't know why is the bitbake debug
messages inside the ThreadPoolExecutor.

Jose

Jose Quaresma via lists.openembedded.org <quaresma.jose=
[email protected]> escreveu no dia sábado, 16/04/2022 à(s)
21:24:

> for the FetchConnectionCache use a queue where each thread can
> get an unsed connection_cache that is properly initialized before
> we fireup the ThreadPoolExecutor.
>
> for the progress bar we need an adictional task counter
> that is protected with thread lock as it runs inside the
> ThreadPoolExecutor.
>
> Fixes [YOCTO #14775] --
> https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
>
> Signed-off-by: Jose Quaresma <[email protected]>
> ---
>  meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
>  1 file changed, 28 insertions(+), 16 deletions(-)
>
> diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> index 1c0cae4893..0ede078770 100644
> --- a/meta/classes/sstate.bbclass
> +++ b/meta/classes/sstate.bbclass
> @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>              localdata.delVar('BB_NO_NETWORK')
>
>          from bb.fetch2 import FetchConnectionCache
> -        def checkstatus_init(thread_worker):
> -            thread_worker.connection_cache = FetchConnectionCache()
> -
> -        def checkstatus_end(thread_worker):
> -            thread_worker.connection_cache.close_connections()
> -
> -        def checkstatus(thread_worker, arg):
> +        def checkstatus_init():
> +            while not connection_cache_pool.full():
> +                connection_cache_pool.put(FetchConnectionCache())
> +
> +        def checkstatus_end():
> +            while not connection_cache_pool.empty():
> +                connection_cache = connection_cache_pool.get()
> +                connection_cache.close_connections()
> +
> +        import threading
> +        _lock = threading.Lock()
> +        def checkstatus(arg):
>              (tid, sstatefile) = arg
>
> +            connection_cache = connection_cache_pool.get()
> +
>              localdata2 = bb.data.createCopy(localdata)
>              srcuri = "file://" + sstatefile
>              localdata2.setVar('SRC_URI', srcuri)
> @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>
>              try:
>                  fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> -
> connection_cache=thread_worker.connection_cache)
> +                            connection_cache=connection_cache)
>                  fetcher.checkstatus()
>                  bb.debug(2, "SState: Successful fetch test for %s" %
> srcuri)
>                  found.add(tid)
> @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>              except Exception as e:
>                  bb.error("SState: cannot test %s: %s\n%s" % (srcuri,
> repr(e), traceback.format_exc()))
>
> +            connection_cache_pool.put(connection_cache)
> +
>              if progress:
> -                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist)
> - thread_worker.tasks.qsize()), d)
> +                with _lock:
> +                    tasks -= 1
> +                bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist)
> - tasks), d)
>
>          tasklist = []
>          for tid in missed:
> @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>          if tasklist:
>              nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))
>
> +            tasks = len(tasklist)
>              progress = len(tasklist) >= 100
>              if progress:
>                  msg = "Checking sstate mirror object availability"
> @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>              fetcherenv = bb.fetch2.get_fetcher_environment(d)
>              with bb.utils.environment(**fetcherenv):
>                  bb.event.enable_threadlock()
> -                pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> -                        worker_init=checkstatus_init,
> worker_end=checkstatus_end,
> -                        name="sstate_checkhashes-")
> -                for t in tasklist:
> -                    pool.add_task(checkstatus, t)
> -                pool.start()
> -                pool.wait_completion()
> +                import concurrent.futures
> +                from queue import Queue
> +                connection_cache_pool = Queue(nproc)
> +                checkstatus_init()
> +                with
> concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> +                    executor.map(checkstatus, tasklist)
> +                checkstatus_end()
>                  bb.event.disable_threadlock()
>
>              if progress:
> --
> 2.35.3
>
>
> 
>
>

-- 
Best regards,

José Quaresma
-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#164560): 
https://lists.openembedded.org/g/openembedded-core/message/164560
Mute This Topic: https://lists.openembedded.org/mt/90512350/21656
Group Owner: [email protected]
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub 
[[email protected]]
-=-=-=-=-=-=-=-=-=-=-=-

Reply via email to