vlc | branch: master | Romain Vimont <[email protected]> | Wed Sep 2 17:06:41 2020 +0200| [f197dd4046648c7d18eeb565235c44eff567d8fc] | committer: Alexandre Janniaux
fetcher: use vlc_executor_t Replace the background workers by executors. Signed-off-by: Alexandre Janniaux <[email protected]> > http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=f197dd4046648c7d18eeb565235c44eff567d8fc --- src/preparser/fetcher.c | 379 ++++++++++++++++++++++++++---------------------- 1 file changed, 207 insertions(+), 172 deletions(-) diff --git a/src/preparser/fetcher.c b/src/preparser/fetcher.c index 5bb4aa1cef..7616078ff8 100644 --- a/src/preparser/fetcher.c +++ b/src/preparser/fetcher.c @@ -31,43 +31,118 @@ #include <vlc_threads.h> #include <vlc_memstream.h> #include <vlc_meta_fetcher.h> +#include <vlc_executor.h> #include "art.h" #include "libvlc.h" #include "fetcher.h" #include "input/input_interface.h" -#include "misc/background_worker.h" #include "misc/interrupt.h" struct input_fetcher_t { - struct background_worker* local; - struct background_worker* network; - struct background_worker* downloader; + vlc_executor_t *executor_local; + vlc_executor_t *executor_network; + vlc_executor_t *executor_downloader; vlc_dictionary_t album_cache; vlc_object_t* owner; + vlc_mutex_t lock; + struct vlc_list submitted_tasks; /**< list of struct task */ }; -struct fetcher_request { +struct task { + input_fetcher_t *fetcher; + vlc_executor_t *executor; input_item_t* item; - vlc_atomic_rc_t rc; int options; const input_fetcher_callbacks_t *cbs; void *userdata; + + vlc_interrupt_t interrupt; + + struct vlc_runnable runnable; /**< to be passed to the executor */ + struct vlc_list node; /**< node of input_fetcher_t.submitted_tasks */ }; -struct fetcher_thread { - void (*pf_worker)( input_fetcher_t*, struct fetcher_request* ); +static void RunDownloader(void *); +static void RunSearchLocal(void *); +static void RunSearchNetwork(void *); + +static struct task * +TaskNew(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item, + input_item_meta_request_option_t options, + const input_fetcher_callbacks_t *cbs, void *userdata) +{ + struct task *task = malloc(sizeof(*task)); + if (!task) + return NULL; + + task->fetcher = fetcher; + task->executor = executor; + task->item = item; + task->options = options; + task->cbs = cbs; + task->userdata = userdata; - struct background_worker* worker; - struct fetcher_request* req; - input_fetcher_t* fetcher; + vlc_interrupt_init(&task->interrupt); - vlc_interrupt_t interrupt; - vlc_thread_t thread; - atomic_bool active; -}; + input_item_Hold(item); + + if (executor == fetcher->executor_local) + task->runnable.run = RunSearchLocal; + else if (executor == fetcher->executor_network) + task->runnable.run = RunSearchNetwork; + else + { + assert(executor == fetcher->executor_downloader); + task->runnable.run = RunDownloader; + } + + task->runnable.userdata = task; + + return task; +} + +static void +TaskDelete(struct task *task) +{ + input_item_Release(task->item); + vlc_interrupt_deinit(&task->interrupt); + free(task); +} + +static void +FetcherAddTask(input_fetcher_t *fetcher, struct task *task) +{ + vlc_mutex_lock(&fetcher->lock); + vlc_list_append(&task->node, &fetcher->submitted_tasks); + vlc_mutex_unlock(&fetcher->lock); +} + +static void +FetcherRemoveTask(input_fetcher_t *fetcher, struct task *task) +{ + vlc_mutex_lock(&fetcher->lock); + vlc_list_remove(&task->node); + vlc_mutex_unlock(&fetcher->lock); +} + +static int +Submit(input_fetcher_t *fetcher, vlc_executor_t *executor, input_item_t *item, + input_item_meta_request_option_t options, + const input_fetcher_callbacks_t *cbs, void *userdata) +{ + struct task *task = + TaskNew(fetcher, executor, item, options, cbs, userdata); + if (!task) + return VLC_ENOMEM; + + FetcherAddTask(fetcher, task); + vlc_executor_Submit(task->executor, &task->runnable); + + return VLC_SUCCESS; +} static char* CreateCacheKey( input_item_t* item ) { @@ -191,13 +266,13 @@ static int SearchArt( input_fetcher_t* fetcher, input_item_t* item, int scope) return CheckArt( item ); } -static int SearchByScope( input_fetcher_t* fetcher, - struct fetcher_request* req, int scope ) +static int SearchByScope(struct task *task, int scope) { - input_item_t* item = req->item; + input_fetcher_t *fetcher = task->fetcher; + input_item_t* item = task->item; if( CheckMeta( item ) && - InvokeModule( fetcher, req->item, scope, "meta fetcher" ) ) + InvokeModule( fetcher, item, scope, "meta fetcher" ) ) { return VLC_EGENERIC; } @@ -208,26 +283,32 @@ static int SearchByScope( input_fetcher_t* fetcher, ! input_FindArtInCache( item ) || ! SearchArt( fetcher, item, scope ) ) { - AddAlbumCache( fetcher, req->item, false ); - if( !background_worker_Push( fetcher->downloader, req, NULL, 0 ) ) + AddAlbumCache( fetcher, task->item, false ); + int ret = Submit(fetcher, fetcher->executor_downloader, item, + task->options, task->cbs, task->userdata); + if (ret == VLC_SUCCESS) return VLC_SUCCESS; } return VLC_EGENERIC; } -static void NotifyArtFetchEnded( struct fetcher_request* req, bool fetched ) +static void NotifyArtFetchEnded(struct task *task, bool fetched) { - if (req->cbs && req->cbs->on_art_fetch_ended) - req->cbs->on_art_fetch_ended(req->item, fetched, req->userdata); + if (task->cbs && task->cbs->on_art_fetch_ended) + task->cbs->on_art_fetch_ended(task->item, fetched, task->userdata); } -static void Downloader( input_fetcher_t* fetcher, - struct fetcher_request* req ) +static void RunDownloader(void *userdata) { - ReadAlbumCache( fetcher, req->item ); + struct task *task = userdata; + input_fetcher_t *fetcher = task->fetcher; + + vlc_interrupt_set(&task->interrupt); + + ReadAlbumCache( fetcher, task->item ); - char *psz_arturl = input_item_GetArtURL( req->item ); + char *psz_arturl = input_item_GetArtURL( task->item ); if( !psz_arturl ) goto error; @@ -266,151 +347,85 @@ static void Downloader( input_fetcher_t* fetcher, goto error; } - input_SaveArt( fetcher->owner, req->item, output_stream.ptr, + input_SaveArt( fetcher->owner, task->item, output_stream.ptr, output_stream.length, NULL ); free( output_stream.ptr ); - AddAlbumCache( fetcher, req->item, true ); + AddAlbumCache( fetcher, task->item, true ); out: + vlc_interrupt_set(NULL); + if( psz_arturl ) { - var_SetAddress( fetcher->owner, "item-change", req->item ); - input_item_SetArtFetched( req->item, true ); + var_SetAddress( fetcher->owner, "item-change", task->item ); + input_item_SetArtFetched( task->item, true ); } free( psz_arturl ); - NotifyArtFetchEnded(req, psz_arturl != NULL); + NotifyArtFetchEnded(task, psz_arturl != NULL); + FetcherRemoveTask(fetcher, task); + TaskDelete(task); return; error: + vlc_interrupt_set(NULL); + FREENULL( psz_arturl ); + NotifyArtFetchEnded(task, false); + FetcherRemoveTask(fetcher, task); + TaskDelete(task); goto out; } -static void SearchLocal( input_fetcher_t* fetcher, struct fetcher_request* req ) +static void RunSearchLocal(void *userdata) { - if( SearchByScope( fetcher, req, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS ) - return; /* done */ + struct task *task = userdata; + input_fetcher_t *fetcher = task->fetcher; + + vlc_interrupt_set(&task->interrupt); + + if( SearchByScope( task, FETCHER_SCOPE_LOCAL ) == VLC_SUCCESS ) + goto end; /* done */ if( var_InheritBool( fetcher->owner, "metadata-network-access" ) || - req->options & META_REQUEST_OPTION_FETCH_NETWORK ) + task->options & META_REQUEST_OPTION_FETCH_NETWORK ) { - if( background_worker_Push( fetcher->network, req, NULL, 0 ) ) - NotifyArtFetchEnded(req, false); + int ret = Submit(fetcher, fetcher->executor_network, task->item, + task->options, task->cbs, task->userdata); + if (ret != VLC_SUCCESS) + NotifyArtFetchEnded(task, false); } else { - input_item_SetArtNotFound( req->item, true ); - NotifyArtFetchEnded(req, false); + input_item_SetArtNotFound( task->item, true ); + NotifyArtFetchEnded(task, false); } -} -static void SearchNetwork( input_fetcher_t* fetcher, struct fetcher_request* req ) -{ - if( SearchByScope( fetcher, req, FETCHER_SCOPE_NETWORK ) ) - { - input_item_SetArtNotFound( req->item, true ); - NotifyArtFetchEnded(req, false); - } -} - -static void RequestRelease( void* req_ ) -{ - struct fetcher_request* req = req_; +end: + vlc_interrupt_set(NULL); - if( !vlc_atomic_rc_dec( &req->rc ) ) - return; - - input_item_Release( req->item ); - free( req ); -} - -static void RequestHold( void* req_ ) -{ - struct fetcher_request* req = req_; - vlc_atomic_rc_inc( &req->rc ); -} - -static void* FetcherThread( void* handle ) -{ - struct fetcher_thread* th = handle; - vlc_interrupt_set( &th->interrupt ); - - th->pf_worker( th->fetcher, th->req ); - - atomic_store( &th->active, false ); - background_worker_RequestProbe( th->worker ); - return NULL; + FetcherRemoveTask(fetcher, task); + TaskDelete(task); } -static int StartWorker( input_fetcher_t* fetcher, - void( *pf_worker )( input_fetcher_t*, struct fetcher_request* ), - struct background_worker* bg, struct fetcher_request* req, void** handle ) +static void RunSearchNetwork(void *userdata) { - struct fetcher_thread* th = malloc( sizeof *th ); - - if( unlikely( !th ) ) - return VLC_ENOMEM; - - th->req = req; - th->worker = bg; - th->fetcher = fetcher; - th->pf_worker = pf_worker; + struct task *task = userdata; - vlc_interrupt_init( &th->interrupt ); - atomic_init( &th->active, true ); + vlc_interrupt_set(&task->interrupt); - if( !vlc_clone( &th->thread, FetcherThread, th, VLC_THREAD_PRIORITY_LOW ) ) + if( SearchByScope( task, FETCHER_SCOPE_NETWORK ) != VLC_SUCCESS ) { - *handle = th; - return VLC_SUCCESS; + input_item_SetArtNotFound( task->item, true ); + NotifyArtFetchEnded(task, false); } - vlc_interrupt_deinit( &th->interrupt ); - free( th ); - return VLC_EGENERIC; -} + vlc_interrupt_set(NULL); -static int ProbeWorker( void* fetcher_, void* th_ ) -{ - return !atomic_load( &((struct fetcher_thread*)th_)->active ); - VLC_UNUSED( fetcher_ ); -} - -static void CloseWorker( void* fetcher_, void* th_ ) -{ - struct fetcher_thread* th = th_; - VLC_UNUSED( fetcher_ ); - - vlc_interrupt_kill( &th->interrupt ); - vlc_join( th->thread, NULL ); - vlc_interrupt_deinit( &th->interrupt ); - free( th ); -} - -#define DEF_STARTER(name, worker) \ -static int Start ## name( void* fetcher_, void* req_, void** out ) { \ - input_fetcher_t* fetcher = fetcher_; \ - return StartWorker( fetcher, name, worker, req_, out ); } - -DEF_STARTER( SearchLocal, fetcher->local ) -DEF_STARTER(SearchNetwork, fetcher->network ) -DEF_STARTER( Downloader, fetcher->downloader ) - -static void WorkerInit( input_fetcher_t* fetcher, - struct background_worker** worker, int( *starter )( void*, void*, void** ) ) -{ - struct background_worker_config conf = { - .default_timeout = 0, - .max_threads = var_InheritInteger( fetcher->owner, "fetch-art-threads" ), - .pf_start = starter, - .pf_probe = ProbeWorker, - .pf_stop = CloseWorker, - .pf_release = RequestRelease, - .pf_hold = RequestHold }; - - *worker = background_worker_New( fetcher, &conf ); + input_fetcher_t *fetcher = task->fetcher; + FetcherRemoveTask(fetcher, task); + TaskDelete(task); } input_fetcher_t* input_fetcher_New( vlc_object_t* owner ) @@ -420,65 +435,85 @@ input_fetcher_t* input_fetcher_New( vlc_object_t* owner ) if( unlikely( !fetcher ) ) return NULL; - fetcher->owner = owner; - - WorkerInit( fetcher, &fetcher->local, StartSearchLocal ); - WorkerInit( fetcher, &fetcher->network, StartSearchNetwork ); - WorkerInit( fetcher, &fetcher->downloader, StartDownloader ); + int max_threads = var_InheritInteger(owner, "fetch-art-threads"); + if (max_threads < 1) + max_threads = 1; - if( unlikely( !fetcher->local || !fetcher->network || !fetcher->downloader ) ) + fetcher->executor_local = vlc_executor_New(max_threads); + if (!fetcher->executor_local) { - if( fetcher->local ) - background_worker_Delete( fetcher->local ); - - if( fetcher->network ) - background_worker_Delete( fetcher->network ); + free(fetcher); + return NULL; + } - if( fetcher->downloader ) - background_worker_Delete( fetcher->downloader ); + fetcher->executor_network = vlc_executor_New(max_threads); + if (!fetcher->executor_network) + { + vlc_executor_Delete(fetcher->executor_local); + free(fetcher); + return NULL; + } - free( fetcher ); + fetcher->executor_downloader = vlc_executor_New(max_threads); + if (!fetcher->executor_downloader) + { + vlc_executor_Delete(fetcher->executor_network); + vlc_executor_Delete(fetcher->executor_local); + free(fetcher); return NULL; } - vlc_mutex_init( &fetcher->lock ); + fetcher->owner = owner; + + vlc_mutex_init(&fetcher->lock); + vlc_list_init(&fetcher->submitted_tasks); + vlc_dictionary_init( &fetcher->album_cache, 0 ); return fetcher; } -int input_fetcher_Push( input_fetcher_t* fetcher, input_item_t* item, +int input_fetcher_Push(input_fetcher_t* fetcher, input_item_t* item, input_item_meta_request_option_t options, - const input_fetcher_callbacks_t *cbs, void *cbs_userdata ) + const input_fetcher_callbacks_t *cbs, void *cbs_userdata) { assert(options & META_REQUEST_OPTION_FETCH_ANY); - struct fetcher_request* req = malloc( sizeof *req ); - if( unlikely( !req ) ) - return VLC_ENOMEM; + vlc_executor_t *executor = options & META_REQUEST_OPTION_FETCH_LOCAL + ? fetcher->executor_local + : fetcher->executor_network; - req->item = item; - req->options = options; - req->cbs = cbs; - req->userdata = cbs_userdata; + return Submit(fetcher, executor, item, options, cbs, cbs_userdata); +} - vlc_atomic_rc_init( &req->rc ); - input_item_Hold( item ); +static void +CancelAllTasks(input_fetcher_t *fetcher) +{ + vlc_mutex_lock(&fetcher->lock); - struct background_worker* worker = - options & META_REQUEST_OPTION_FETCH_LOCAL ? fetcher->local : fetcher->network; - if( background_worker_Push( worker, req, NULL, 0 ) ) - NotifyArtFetchEnded(req, false); + struct task *task; + vlc_list_foreach(task, &fetcher->submitted_tasks, node) + { + bool canceled = vlc_executor_Cancel(task->executor, &task->runnable); + if (canceled) + { + NotifyArtFetchEnded(task, false); + vlc_list_remove(&task->node); + TaskDelete(task); + } + /* Otherwise, the task will be finished and destroyed after run() */ + } - RequestRelease( req ); - return VLC_SUCCESS; + vlc_mutex_unlock(&fetcher->lock); } void input_fetcher_Delete( input_fetcher_t* fetcher ) { - background_worker_Delete( fetcher->local ); - background_worker_Delete( fetcher->network ); - background_worker_Delete( fetcher->downloader ); + CancelAllTasks(fetcher); + + vlc_executor_Delete(fetcher->executor_local); + vlc_executor_Delete(fetcher->executor_network); + vlc_executor_Delete(fetcher->executor_downloader); vlc_dictionary_clear( &fetcher->album_cache, FreeCacheEntry, NULL ); free( fetcher ); _______________________________________________ vlc-commits mailing list [email protected] https://mailman.videolan.org/listinfo/vlc-commits
