On Sat, 2022-04-16 at 21:24 +0100, Jose Quaresma wrote:
> for the FetchConnectionCache use a queue where each thread can
> get an unsed connection_cache that is properly initialized before
> we fireup the ThreadPoolExecutor.
>
> for the progress bar we need an adictional task counter
> that is protected with thread lock as it runs inside the
> ThreadPoolExecutor.
>
> Fixes [YOCTO #14775] --
> https://bugzilla.yoctoproject.org/show_bug.cgi?id=14775
>
> Signed-off-by: Jose Quaresma <[email protected]>
> ---
> meta/classes/sstate.bbclass | 44 +++++++++++++++++++++++--------------
> 1 file changed, 28 insertions(+), 16 deletions(-)
Are there specific issues you see with oe.utils.ThreadedPool that this change
addresses? Were you able to reproduce the issue in 14775?
I'm a little concerned we swap one implementation where we know roughly what the
issues are for another where we dont :/.
I notice that ThreadPoolExecutor can take an initializer but you're doing this
using the queue instead. Is that because you suspect some issue with those being
setup in the separate threads?
You also mentioned the debug messages not showing. That suggests something is
wrong with the event handlers in the new threading model and that errors
wouldn't propagate either so we need to check into that.
This is definitely an interesting idea but I'm nervous about it :/.
Cheers,
Richard
> diff --git a/meta/classes/sstate.bbclass b/meta/classes/sstate.bbclass
> index 1c0cae4893..0ede078770 100644
> --- a/meta/classes/sstate.bbclass
> +++ b/meta/classes/sstate.bbclass
> @@ -977,15 +977,22 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> localdata.delVar('BB_NO_NETWORK')
>
> from bb.fetch2 import FetchConnectionCache
> - def checkstatus_init(thread_worker):
> - thread_worker.connection_cache = FetchConnectionCache()
> -
> - def checkstatus_end(thread_worker):
> - thread_worker.connection_cache.close_connections()
> -
> - def checkstatus(thread_worker, arg):
> + def checkstatus_init():
> + while not connection_cache_pool.full():
> + connection_cache_pool.put(FetchConnectionCache())
> +
> + def checkstatus_end():
> + while not connection_cache_pool.empty():
> + connection_cache = connection_cache_pool.get()
> + connection_cache.close_connections()
> +
> + import threading
> + _lock = threading.Lock()
> + def checkstatus(arg):
> (tid, sstatefile) = arg
>
> + connection_cache = connection_cache_pool.get()
> +
> localdata2 = bb.data.createCopy(localdata)
> srcuri = "file://" + sstatefile
> localdata2.setVar('SRC_URI', srcuri)
> @@ -995,7 +1002,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
>
> try:
> fetcher = bb.fetch2.Fetch(srcuri.split(), localdata2,
> - connection_cache=thread_worker.connection_cache)
> + connection_cache=connection_cache)
> fetcher.checkstatus()
> bb.debug(2, "SState: Successful fetch test for %s" % srcuri)
> found.add(tid)
> @@ -1005,8 +1012,12 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> except Exception as e:
> bb.error("SState: cannot test %s: %s\n%s" % (srcuri,
> repr(e), traceback.format_exc()))
>
> + connection_cache_pool.put(connection_cache)
> +
> if progress:
> - bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) -
> thread_worker.tasks.qsize()), d)
> + with _lock:
> + tasks -= 1
> + bb.event.fire(bb.event.ProcessProgress(msg, len(tasklist) -
> tasks), d)
>
> tasklist = []
> for tid in missed:
> @@ -1016,6 +1027,7 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> if tasklist:
> nproc = min(int(d.getVar("BB_NUMBER_THREADS")), len(tasklist))
>
> + tasks = len(tasklist)
> progress = len(tasklist) >= 100
> if progress:
> msg = "Checking sstate mirror object availability"
> @@ -1025,13 +1037,13 @@ def sstate_checkhashes(sq_data, d, siginfo=False,
> currentcount=0, summary=True,
> fetcherenv = bb.fetch2.get_fetcher_environment(d)
> with bb.utils.environment(**fetcherenv):
> bb.event.enable_threadlock()
> - pool = oe.utils.ThreadedPool(nproc, len(tasklist),
> - worker_init=checkstatus_init,
> worker_end=checkstatus_end,
> - name="sstate_checkhashes-")
> - for t in tasklist:
> - pool.add_task(checkstatus, t)
> - pool.start()
> - pool.wait_completion()
> + import concurrent.futures
> + from queue import Queue
> + connection_cache_pool = Queue(nproc)
> + checkstatus_init()
> + with
> concurrent.futures.ThreadPoolExecutor(max_workers=nproc) as executor:
> + executor.map(checkstatus, tasklist)
> + checkstatus_end()
> bb.event.disable_threadlock()
>
> if progress:
>
>
-=-=-=-=-=-=-=-=-=-=-=-
Links: You receive all messages sent to this group.
View/Reply Online (#164561):
https://lists.openembedded.org/g/openembedded-core/message/164561
Mute This Topic: https://lists.openembedded.org/mt/90512350/21656
Group Owner: [email protected]
Unsubscribe: https://lists.openembedded.org/g/openembedded-core/unsub
[[email protected]]
-=-=-=-=-=-=-=-=-=-=-=-