vlc | branch: master | Romain Vimont <[email protected]> | Tue Sep 1 18:09:09 2020 +0200| [5386f8eea902906c2e359f25976847a9745f4648] | committer: Alexandre Janniaux
preparser: use vlc_executor_t Replace the background worker by an executor. Signed-off-by: Alexandre Janniaux <[email protected]> > http://git.videolan.org/gitweb.cgi/vlc.git/?a=commit;h=5386f8eea902906c2e359f25976847a9745f4648 --- src/preparser/preparser.c | 401 +++++++++++++++++++++++++++++----------------- 1 file changed, 251 insertions(+), 150 deletions(-) diff --git a/src/preparser/preparser.c b/src/preparser/preparser.c index 723feb96dc..e5bd0b87cb 100644 --- a/src/preparser/preparser.c +++ b/src/preparser/preparser.c @@ -24,8 +24,8 @@ #include <vlc_common.h> #include <vlc_atomic.h> +#include <vlc_executor.h> -#include "misc/background_worker.h" #include "input/input_interface.h" #include "input/input_internal.h" #include "preparser.h" @@ -35,222 +35,299 @@ struct input_preparser_t { vlc_object_t* owner; input_fetcher_t* fetcher; - struct background_worker* worker; + vlc_executor_t *executor; + vlc_tick_t default_timeout; atomic_bool deactivated; + + vlc_mutex_t lock; + struct vlc_list submitted_tasks; /**< list of struct task */ }; -typedef struct input_preparser_req_t +struct task { + input_preparser_t *preparser; input_item_t *item; input_item_meta_request_option_t options; const input_preparser_callbacks_t *cbs; void *userdata; - vlc_atomic_rc_t rc; -} input_preparser_req_t; + void *id; + vlc_tick_t timeout; -typedef struct input_preparser_task_t -{ - input_preparser_req_t *req; - input_preparser_t* preparser; - int preparse_status; input_item_parser_id_t *parser; - atomic_int state; - atomic_bool done; -} input_preparser_task_t; - -static input_preparser_req_t *ReqCreate(input_item_t *item, - input_item_meta_request_option_t options, - const input_preparser_callbacks_t *cbs, - void *userdata) + + vlc_mutex_t lock; + vlc_cond_t cond_ended; + bool preparse_ended; + int preparse_status; + bool fetch_ended; + + atomic_bool interrupted; + + struct vlc_runnable runnable; /**< to be passed to the executor */ + + struct vlc_list node; /**< node of input_preparser_t.submitted_tasks */ +}; + +static void RunnableRun(void *); + +static struct task * +TaskNew(input_preparser_t *preparser, input_item_t *item, + input_item_meta_request_option_t options, + const input_preparser_callbacks_t *cbs, void *userdata, + void *id, vlc_tick_t timeout) { - input_preparser_req_t *req = malloc(sizeof(*req)); - if (unlikely(!req)) + assert(timeout >= 0); + + struct task *task = malloc(sizeof(*task)); + if (!task) return NULL; - req->item = item; - req->options = options; - req->cbs = cbs; - req->userdata = userdata; - vlc_atomic_rc_init(&req->rc); + task->preparser = preparser; + task->item = item; + task->options = options; + task->cbs = cbs; + task->userdata = userdata; + task->id = id; + task->timeout = timeout; input_item_Hold(item); - return req; + task->parser = NULL; + vlc_mutex_init(&task->lock); + vlc_cond_init(&task->cond_ended); + task->preparse_ended = false; + task->preparse_status = ITEM_PREPARSE_SKIPPED; + task->fetch_ended = false; + + atomic_init(&task->interrupted, false); + + task->runnable.run = RunnableRun; + task->runnable.userdata = task; + + return task; } -static void ReqHold(input_preparser_req_t *req) +static void +TaskDelete(struct task *task) { - vlc_atomic_rc_inc(&req->rc); + input_item_Release(task->item); + free(task); } -static void ReqRelease(input_preparser_req_t *req) +static void +PreparserAddTask(input_preparser_t *preparser, struct task *task) { - if (vlc_atomic_rc_dec(&req->rc)) - { - input_item_Release(req->item); - free(req); - } + vlc_mutex_lock(&preparser->lock); + vlc_list_append(&task->node, &preparser->submitted_tasks); + vlc_mutex_unlock(&preparser->lock); } -static void OnParserEnded(input_item_t *item, int status, void *task_) +static void +PreparserRemoveTask(input_preparser_t *preparser, struct task *task) { - VLC_UNUSED(item); - input_preparser_task_t* task = task_; - - atomic_store( &task->state, status ); - atomic_store( &task->done, true ); - background_worker_RequestProbe( task->preparser->worker ); + vlc_mutex_lock(&preparser->lock); + vlc_list_remove(&task->node); + vlc_mutex_unlock(&preparser->lock); } -static void OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree, - void *task_) +static void +NotifyPreparseEnded(struct task *task) { - VLC_UNUSED(item); - input_preparser_task_t* task = task_; - input_preparser_req_t *req = task->req; - - if (req->cbs && req->cbs->on_subtree_added) - req->cbs->on_subtree_added(req->item, subtree, req->userdata); + if (task->cbs && task->cbs->on_preparse_ended) + task->cbs->on_preparse_ended(task->item, task->preparse_status, + task->userdata); } -static int PreparserOpenInput( void* preparser_, void* req_, void** out ) +static void +OnParserEnded(input_item_t *item, int status, void *task_) { - input_preparser_t* preparser = preparser_; - input_preparser_req_t *req = req_; - input_preparser_task_t* task = malloc( sizeof *task ); - - if( unlikely( !task ) ) - goto error; - - static const input_item_parser_cbs_t cbs = { - .on_ended = OnParserEnded, - .on_subtree_added = OnParserSubtreeAdded, - }; - - atomic_init( &task->state, VLC_ETIMEOUT ); - atomic_init( &task->done, false ); - - task->preparser = preparser_; - task->req = req; - task->preparse_status = -1; - task->parser = input_item_Parse( req->item, preparser->owner, &cbs, - task ); - if( !task->parser ) - goto error; + VLC_UNUSED(item); + struct task *task = task_; - *out = task; + if (atomic_load(&task->interrupted)) + /* + * On interruption, the call to input_item_parser_id_Release() may + * trigger this "parser ended" callback. Ignore it. + */ + return; - return VLC_SUCCESS; + vlc_mutex_lock(&task->lock); + assert(!task->preparse_ended); + task->preparse_status = status == VLC_SUCCESS ? ITEM_PREPARSE_DONE + : ITEM_PREPARSE_FAILED; + task->preparse_ended = true; + vlc_mutex_unlock(&task->lock); -error: - free( task ); - if (req->cbs && req->cbs->on_preparse_ended) - req->cbs->on_preparse_ended(req->item, ITEM_PREPARSE_FAILED, req->userdata); - return VLC_EGENERIC; + vlc_cond_signal(&task->cond_ended); } -static int PreparserProbeInput( void* preparser_, void* task_ ) +static void +OnParserSubtreeAdded(input_item_t *item, input_item_node_t *subtree, + void *task_) { - input_preparser_task_t* task = task_; - return atomic_load( &task->done ); - VLC_UNUSED( preparser_ ); + VLC_UNUSED(item); + struct task *task = task_; + + if (task->cbs && task->cbs->on_subtree_added) + task->cbs->on_subtree_added(task->item, subtree, task->userdata); } -static void on_art_fetch_ended(input_item_t *item, bool fetched, void *userdata) +static void +OnArtFetchEnded(input_item_t *item, bool fetched, void *userdata) { VLC_UNUSED(item); VLC_UNUSED(fetched); - input_preparser_task_t *task = userdata; - input_preparser_req_t *req = task->req; - input_item_SetPreparsed(req->item, true); + struct task *task = userdata; - if (req->cbs && req->cbs->on_preparse_ended) - req->cbs->on_preparse_ended(req->item, task->preparse_status, req->userdata); + vlc_mutex_lock(&task->lock); + assert(!task->fetch_ended); + task->fetch_ended = true; + vlc_mutex_unlock(&task->lock); - ReqRelease(req); - free(task); + vlc_cond_signal(&task->cond_ended); } static const input_fetcher_callbacks_t input_fetcher_callbacks = { - .on_art_fetch_ended = on_art_fetch_ended, + .on_art_fetch_ended = OnArtFetchEnded, }; -static void PreparserCloseInput( void* preparser_, void* task_ ) +static void +Parse(struct task *task, vlc_tick_t deadline) { - input_preparser_task_t* task = task_; - input_preparser_req_t *req = task->req; - - input_preparser_t* preparser = preparser_; - input_item_t* item = req->item; + static const input_item_parser_cbs_t cbs = { + .on_ended = OnParserEnded, + .on_subtree_added = OnParserSubtreeAdded, + }; - int status; - switch( atomic_load( &task->state ) ) + vlc_object_t *obj = task->preparser->owner; + task->parser = input_item_Parse(task->item, obj, &cbs, task); + if (!task->parser) { - case VLC_SUCCESS: - status = ITEM_PREPARSE_DONE; - break; - case VLC_ETIMEOUT: - status = ITEM_PREPARSE_TIMEOUT; - break; - default: - status = ITEM_PREPARSE_FAILED; - break; + task->preparse_status = ITEM_PREPARSE_FAILED; + return; } - input_item_parser_id_Release( task->parser ); - - if( preparser->fetcher && (req->options & META_REQUEST_OPTION_FETCH_ANY) ) + /* Wait until the end of parsing */ + vlc_mutex_lock(&task->lock); + if (deadline == VLC_TICK_INVALID) + { + while (!task->preparse_ended) + vlc_cond_wait(&task->cond_ended, &task->lock); + } + else { - task->preparse_status = status; - ReqHold(task->req); - if (!input_fetcher_Push(preparser->fetcher, item, - req->options & META_REQUEST_OPTION_FETCH_ANY, - &input_fetcher_callbacks, task)) + bool timeout = false; + while (!task->preparse_ended && !timeout) + timeout = + vlc_cond_timedwait(&task->cond_ended, &task->lock, deadline); + + if (timeout) { - return; + task->preparse_status = ITEM_PREPARSE_TIMEOUT; + atomic_store(&task->interrupted, true); } - ReqRelease(task->req); } + vlc_mutex_unlock(&task->lock); - free(task); + /* This call also interrupts the parsing if it is still running */ + input_item_parser_id_Release(task->parser); +} + +static void +Fetch(struct task *task) +{ + input_fetcher_t *fetcher = task->preparser->fetcher; + if (!fetcher || !(task->options & META_REQUEST_OPTION_FETCH_ANY)) + return; + + int ret = + input_fetcher_Push(fetcher, task->item, + task->options & META_REQUEST_OPTION_FETCH_ANY, + &input_fetcher_callbacks, task); + if (ret != VLC_SUCCESS) + return; + + /* Wait until the end of fetching (fetching is not interruptible) */ + vlc_mutex_lock(&task->lock); + while (!task->fetch_ended) + vlc_cond_wait(&task->cond_ended, &task->lock); + vlc_mutex_unlock(&task->lock); +} + +static void +RunnableRun(void *userdata) +{ + struct task *task = userdata; + + vlc_tick_t deadline = task->timeout ? vlc_tick_now() + task->timeout + : VLC_TICK_INVALID; + + if (atomic_load(&task->interrupted)) + goto end; - input_item_SetPreparsed( item, true ); - if (req->cbs && req->cbs->on_preparse_ended) - req->cbs->on_preparse_ended(req->item, status, req->userdata); + Parse(task, deadline); + + if (atomic_load(&task->interrupted)) + goto end; + + Fetch(task); + + if (atomic_load(&task->interrupted)) + goto end; + + input_item_SetPreparsed(task->item, true); + +end: + NotifyPreparseEnded(task); + input_preparser_t *preparser = task->preparser; + PreparserRemoveTask(preparser, task); + TaskDelete(task); } -static void ReqHoldVoid(void *item) { ReqHold(item); } -static void ReqReleaseVoid(void *item) { ReqRelease(item); } +static void +Interrupt(struct task *task) +{ + assert(!atomic_load(&task->interrupted)); + atomic_store(&task->interrupted, true); + + /* Wake up the preparser cond_wait */ + vlc_mutex_lock(&task->lock); + task->preparse_status = ITEM_PREPARSE_TIMEOUT; + task->preparse_ended = true; + vlc_mutex_unlock(&task->lock); + vlc_cond_signal(&task->cond_ended); +} input_preparser_t* input_preparser_New( vlc_object_t *parent ) { input_preparser_t* preparser = malloc( sizeof *preparser ); + if (!preparser) + return NULL; - struct background_worker_config conf = { - .default_timeout = VLC_TICK_FROM_MS(var_InheritInteger( parent, "preparse-timeout" )), - .max_threads = var_InheritInteger( parent, "preparse-threads" ), - .pf_start = PreparserOpenInput, - .pf_probe = PreparserProbeInput, - .pf_stop = PreparserCloseInput, - .pf_release = ReqReleaseVoid, - .pf_hold = ReqHoldVoid - }; - - - if( likely( preparser ) ) - preparser->worker = background_worker_New( preparser, &conf ); + int max_threads = var_InheritInteger(parent, "preparse-threads"); + if (max_threads < 1) + max_threads = 1; - if( unlikely( !preparser || !preparser->worker ) ) + preparser->executor = vlc_executor_New(max_threads); + if (!preparser->executor) { - free( preparser ); + free(preparser); return NULL; } + preparser->default_timeout = + VLC_TICK_FROM_MS(var_InheritInteger(parent, "preparse-timeout")); + if (preparser->default_timeout < 0) + preparser->default_timeout = 0; + preparser->owner = parent; preparser->fetcher = input_fetcher_New( parent ); atomic_init( &preparser->deactivated, false ); + vlc_mutex_init(&preparser->lock); + vlc_list_init(&preparser->submitted_tasks); + if( unlikely( !preparser->fetcher ) ) msg_Warn( parent, "unable to create art fetcher" ); @@ -260,7 +337,7 @@ input_preparser_t* input_preparser_New( vlc_object_t *parent ) void input_preparser_Push( input_preparser_t *preparser, input_item_t *item, input_item_meta_request_option_t i_options, const input_preparser_callbacks_t *cbs, void *cbs_userdata, - int timeout, void *id ) + int timeout_ms, void *id ) { if( atomic_load( &preparser->deactivated ) ) return; @@ -287,14 +364,14 @@ void input_preparser_Push( input_preparser_t *preparser, return; } - struct input_preparser_req_t *req = ReqCreate(item, i_options, - cbs, cbs_userdata); + vlc_tick_t timeout = timeout_ms == -1 ? preparser->default_timeout + : VLC_TICK_FROM_MS(timeout_ms); + struct task *task = + TaskNew(preparser, item, i_options, cbs, cbs_userdata, id, timeout); - if (background_worker_Push(preparser->worker, req, id, timeout)) - if (req->cbs && cbs->on_preparse_ended) - cbs->on_preparse_ended(item, ITEM_PREPARSE_FAILED, cbs_userdata); + PreparserAddTask(preparser, task); - ReqRelease(req); + vlc_executor_Submit(preparser->executor, &task->runnable); } void input_preparser_fetcher_Push( input_preparser_t *preparser, @@ -308,18 +385,42 @@ void input_preparser_fetcher_Push( input_preparser_t *preparser, void input_preparser_Cancel( input_preparser_t *preparser, void *id ) { - background_worker_Cancel( preparser->worker, id ); + vlc_mutex_lock(&preparser->lock); + + struct task *task; + vlc_list_foreach(task, &preparser->submitted_tasks, node) + { + if (!id || task->id == id) + { + bool canceled = + vlc_executor_Cancel(preparser->executor, &task->runnable); + if (canceled) + { + NotifyPreparseEnded(task); + vlc_list_remove(&task->node); + TaskDelete(task); + } + else + /* The task will be finished and destroyed after run() */ + Interrupt(task); + } + } + + vlc_mutex_unlock(&preparser->lock); } void input_preparser_Deactivate( input_preparser_t* preparser ) { atomic_store( &preparser->deactivated, true ); - background_worker_Cancel( preparser->worker, NULL ); + input_preparser_Cancel(preparser, NULL); } void input_preparser_Delete( input_preparser_t *preparser ) { - background_worker_Delete( preparser->worker ); + /* In case input_preparser_Deactivate() has not been called */ + input_preparser_Cancel(preparser, NULL); + + vlc_executor_Delete(preparser->executor); if( preparser->fetcher ) input_fetcher_Delete( preparser->fetcher ); _______________________________________________ vlc-commits mailing list [email protected] https://mailman.videolan.org/listinfo/vlc-commits
