I need someone to review my below fix for MDEV-7818. This involves the FLUSH TABLES WITH READ LOCK command and associated metadata locking, something that I know little about. Monty suggested the basic semantics - that all currently running parallel replication transactions would be allowed to complete before granting the global read lock. But it really needs a review of the actual implementation, by someone who has an understanding of the FTWRL semantics and implementation.
Thanks, - Kristian. Kristian Nielsen <[email protected]> writes: > revision-id: 0e609daea4796fafca94f7adf796addfc506168b > parent(s): 22b6c297dc0de6b176238ae820bddb5e70f2c8ab > committer: Kristian Nielsen > branch nick: mariadb > timestamp: 2015-06-09 16:01:53 +0200 > message: > > MDEV-7818: Deadlock occurring with parallel replication and FTWRL > > Problem is that FLUSH TABLES WITH READ LOCK first blocks threads from > starting new commits, then waits for running commits to complete. But > in-order parallel replication needs commits to happen in a particular > order, so this can easily deadlock. > > To fix this problem, this patch introduces a way to temporarily pause > the parallel replication worker threads. Before starting FTWRL, we let > all worker threads complete in-progress transactions, and then > wait. Then we proceed to take the global read lock. Once the lock is > obtained, we unpause the worker threads. Now commits are blocked from > starting by the global read lock, so the deadlock will no longer occur. > > --- > sql/mysqld.cc | 3 + > sql/mysqld.h | 3 + > sql/rpl_parallel.cc | 230 > ++++++++++++++++++++++++++++++++++++++++++++++++++-- > sql/rpl_parallel.h | 27 ++++-- > sql/sql_parse.cc | 13 +++ > 5 files changed, 262 insertions(+), 14 deletions(-) > > diff --git a/sql/mysqld.cc b/sql/mysqld.cc > index e05c0b6..af8ae2c 100644 > --- a/sql/mysqld.cc > +++ b/sql/mysqld.cc > @@ -9514,6 +9514,9 @@ PSI_stage_info > stage_waiting_for_prior_transaction_to_commit= { 0, "Waiting for > PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit= { 0, > "Waiting for prior transaction to start commit before starting next > transaction", 0}; > PSI_stage_info stage_waiting_for_room_in_worker_thread= { 0, "Waiting for > room in worker thread event queue", 0}; > PSI_stage_info stage_waiting_for_workers_idle= { 0, "Waiting for worker > threads to be idle", 0}; > +PSI_stage_info stage_waiting_for_ftwrl= { 0, "Waiting due to global read > lock", 0}; > +PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause= { 0, "Waiting for > worker threads to pause for global read lock", 0}; > +PSI_stage_info stage_waiting_for_rpl_thread_pool= { 0, "Waiting while > replication worker thread pool is busy", 0}; > PSI_stage_info stage_master_gtid_wait_primary= { 0, "Waiting in > MASTER_GTID_WAIT() (primary waiter)", 0}; > PSI_stage_info stage_master_gtid_wait= { 0, "Waiting in MASTER_GTID_WAIT()", > 0}; > PSI_stage_info stage_gtid_wait_other_connection= { 0, "Waiting for other > master connection to process GTID received on multiple master connections", > 0}; > diff --git a/sql/mysqld.h b/sql/mysqld.h > index 156e7f9..8c953c1 100644 > --- a/sql/mysqld.h > +++ b/sql/mysqld.h > @@ -454,6 +454,9 @@ extern PSI_stage_info > stage_waiting_for_prior_transaction_to_commit; > extern PSI_stage_info stage_waiting_for_prior_transaction_to_start_commit; > extern PSI_stage_info stage_waiting_for_room_in_worker_thread; > extern PSI_stage_info stage_waiting_for_workers_idle; > +extern PSI_stage_info stage_waiting_for_ftwrl; > +extern PSI_stage_info stage_waiting_for_ftwrl_threads_to_pause; > +extern PSI_stage_info stage_waiting_for_rpl_thread_pool; > extern PSI_stage_info stage_master_gtid_wait_primary; > extern PSI_stage_info stage_master_gtid_wait; > extern PSI_stage_info stage_gtid_wait_other_connection; > diff --git a/sql/rpl_parallel.cc b/sql/rpl_parallel.cc > index 738d0e5..d90507a 100644 > --- a/sql/rpl_parallel.cc > +++ b/sql/rpl_parallel.cc > @@ -280,6 +280,8 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer > *gco, > rpl_parallel_entry *entry= rgi->parallel_entry; > uint64 wait_count; > > + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); > + > if (!gco->installed) > { > group_commit_orderer *prev_gco= gco->prev_gco; > @@ -336,6 +338,159 @@ do_gco_wait(rpl_group_info *rgi, group_commit_orderer > *gco, > } > > > +static void > +do_ftwrl_wait(rpl_group_info *rgi, > + bool *did_enter_cond, PSI_stage_info *old_stage) > +{ > + THD *thd= rgi->thd; > + rpl_parallel_entry *entry= rgi->parallel_entry; > + uint64 sub_id= rgi->gtid_sub_id; > + > + mysql_mutex_assert_owner(&entry->LOCK_parallel_entry); > + > + if (unlikely(entry->pause_sub_id > 0) && sub_id > entry->pause_sub_id) > + { > + thd->ENTER_COND(&entry->COND_parallel_entry, &entry->LOCK_parallel_entry, > + &stage_waiting_for_ftwrl, old_stage); > + *did_enter_cond= true; > + do > + { > + if (entry->force_abort || rgi->worker_error) > + break; > + if (thd->check_killed()) > + { > + thd->send_kill_message(); > + slave_output_error_info(rgi, thd); > + signal_error_to_sql_driver_thread(thd, rgi, 1); > + break; > + } > + mysql_cond_wait(&entry->COND_parallel_entry, > &entry->LOCK_parallel_entry); > + } while (entry->pause_sub_id > 0 && sub_id > entry->pause_sub_id); > + } > + > + if (sub_id > entry->largest_started_sub_id) > + entry->largest_started_sub_id= sub_id; > +} > + > + > +static void > +pool_mark_busy(rpl_parallel_thread_pool *pool) > +{ > + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool); > + DBUG_ASSERT(!pool->busy); > + pool->busy= true; > +} > + > + > +static void > +pool_mark_not_busy(rpl_parallel_thread_pool *pool) > +{ > + mysql_mutex_assert_owner(&pool->LOCK_rpl_thread_pool); > + DBUG_ASSERT(pool->busy); > + pool->busy= false; > + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); > +} > + > + > +void > +rpl_unpause_for_ftwrl(THD *thd) > +{ > + uint32 i; > + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; > + > + DBUG_ASSERT(pool->busy); > + > + for (i= 0; i < pool->count; ++i) > + { > + rpl_parallel_entry *e; > + rpl_parallel_thread *rpt= pool->threads[i]; > + > + mysql_mutex_lock(&rpt->LOCK_rpl_thread); > + if (!rpt->current_owner) > + { > + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); > + continue; > + } > + e= rpt->current_entry; > + mysql_mutex_lock(&e->LOCK_parallel_entry); > + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); > + e->pause_sub_id= 0; > + mysql_cond_broadcast(&e->COND_parallel_entry); > + } > + > + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > + pool_mark_not_busy(pool); > + mysql_cond_broadcast(&pool->COND_rpl_thread_pool); > + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > +} > + > + > +/* > + . > + > + Note: in case of error return, unpause_for_ftwrl() must _not_ be called. > +*/ > +int > +rpl_pause_for_ftwrl(THD *thd) > +{ > + uint32 i; > + rpl_parallel_thread_pool *pool= &global_rpl_thread_pool; > + int err= 0; > + > + /* > + While the count_pending_pause_for_ftwrl counter is non-zero, the pool > + cannot be shutdown/resized, so threads are guaranteed to not disappear. > + > + This is required to safely be able to access the individual threads > below. > + (We cannot lock an individual thread while holding LOCK_rpl_thread_pool, > + as this can deadlock against release_thread()). > + */ > + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > + pool_mark_busy(pool); > + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > + > + for (i= 0; i < pool->count; ++i) > + { > + PSI_stage_info old_stage; > + rpl_parallel_entry *e; > + rpl_parallel_thread *rpt= pool->threads[i]; > + > + mysql_mutex_lock(&rpt->LOCK_rpl_thread); > + if (!rpt->current_owner) > + { > + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); > + continue; > + } > + e= rpt->current_entry; > + mysql_mutex_lock(&e->LOCK_parallel_entry); > + mysql_mutex_unlock(&rpt->LOCK_rpl_thread); > + ++e->need_sub_id_signal; > + if (!e->pause_sub_id) > + e->pause_sub_id= e->largest_started_sub_id; > + thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, > + &stage_waiting_for_ftwrl_threads_to_pause, &old_stage); > + while (e->last_committed_sub_id < e->pause_sub_id && !err) > + { > + if (thd->check_killed()) > + { > + thd->send_kill_message(); > + err= 1; > + break; > + } > + mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); > + }; > + --e->need_sub_id_signal; > + thd->EXIT_COND(&old_stage); > + if (err) > + break; > + } > + > + if (err) > + rpl_unpause_for_ftwrl(thd); > + return err; > +} > + > + > #ifndef DBUG_OFF > static int > dbug_simulate_tmp_error(rpl_group_info *rgi, THD *thd) > @@ -856,6 +1011,9 @@ handle_rpl_parallel_thread(void *arg) > > if (unlikely(entry->stop_on_error_sub_id <= rgi->wait_commit_sub_id)) > skip_event_group= true; > + if (likely(!skip_event_group)) > + do_ftwrl_wait(rgi, &did_enter_cond, &old_stage); > + > register_wait_for_prior_event_group_commit(rgi, entry); > > unlock_or_exit_cond(thd, &entry->LOCK_parallel_entry, > @@ -1058,6 +1216,47 @@ > rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, > rpl_parallel_thread **new_list= NULL; > rpl_parallel_thread *new_free_list= NULL; > rpl_parallel_thread *rpt_array= NULL; > + THD *thd; > + PSI_stage_info old_stage; > + int res; > + > + /* > + Wait here while the queue is busy. This is done to make FLUSH TABLES WITH > + READ LOCK work correctly, without incuring extra locking penalties in > + normal operation. FLUSH TABLES WITH READ LOCK needs to lock threads in > the > + thread pool, and for this we need to make sure the pool will not go away > + during the operation. The LOCK_rpl_thread_pool is not suitable for > + this. It is taken by release_thread() while holding LOCK_rpl_thread; so > it > + must be released before locking any LOCK_rpl_thread lock, or a deadlock > + can occur. > + > + So we protect the infrequent operations of FLUSH TABLES WITH READ LOCK > and > + pool size changes with this condition wait. > + */ > + thd= current_thd; > + res= 0; > + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > + if (thd) > + thd->ENTER_COND(&pool->COND_rpl_thread_pool, &pool->LOCK_rpl_thread_pool, > + &stage_waiting_for_rpl_thread_pool, &old_stage); > + while (pool->busy) > + { > + if (thd->check_killed()) > + { > + thd->send_kill_message(); > + res= 1; > + break; > + } > + mysql_cond_wait(&pool->COND_rpl_thread_pool, > &pool->LOCK_rpl_thread_pool); > + } > + if (!res) > + pool_mark_busy(pool); > + if (thd) > + thd->EXIT_COND(&old_stage); > + else > + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > + if (res) > + return res; > > /* > Allocate the new list of threads up-front. > @@ -1106,7 +1305,14 @@ > rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, > */ > for (i= 0; i < pool->count; ++i) > { > - rpl_parallel_thread *rpt= pool->get_thread(NULL, NULL); > + rpl_parallel_thread *rpt; > + > + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > + while ((rpt= pool->free_list) == NULL) > + mysql_cond_wait(&pool->COND_rpl_thread_pool, > &pool->LOCK_rpl_thread_pool); > + pool->free_list= rpt->next; > + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > + mysql_mutex_lock(&rpt->LOCK_rpl_thread); > rpt->stop= true; > mysql_cond_signal(&rpt->COND_rpl_thread); > mysql_mutex_unlock(&rpt->LOCK_rpl_thread); > @@ -1157,7 +1363,7 @@ > rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, > } > > mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > - mysql_cond_broadcast(&pool->COND_rpl_thread_pool); > + pool_mark_not_busy(pool); > mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > > return 0; > @@ -1182,6 +1388,9 @@ > rpl_parallel_change_thread_count(rpl_parallel_thread_pool *pool, > } > my_free(new_list); > } > + mysql_mutex_lock(&pool->LOCK_rpl_thread_pool); > + pool_mark_not_busy(pool); > + mysql_mutex_unlock(&pool->LOCK_rpl_thread_pool); > return 1; > } > > @@ -1445,7 +1654,7 @@ rpl_parallel_thread::loc_free_gco(group_commit_orderer > *gco) > > > rpl_parallel_thread_pool::rpl_parallel_thread_pool() > - : count(0), threads(0), free_list(0), inited(false) > + : threads(0), free_list(0), count(0), inited(false), busy(false) > { > } > > @@ -1453,9 +1662,10 @@ rpl_parallel_thread_pool::rpl_parallel_thread_pool() > int > rpl_parallel_thread_pool::init(uint32 size) > { > - count= 0; > threads= NULL; > free_list= NULL; > + count= 0; > + busy= false; > > mysql_mutex_init(key_LOCK_rpl_thread_pool, &LOCK_rpl_thread_pool, > MY_MUTEX_INIT_SLOW); > @@ -1496,8 +1706,14 @@ > rpl_parallel_thread_pool::get_thread(rpl_parallel_thread **owner, > rpl_parallel_thread *rpt; > > mysql_mutex_lock(&LOCK_rpl_thread_pool); > - while ((rpt= free_list) == NULL) > + for (;;) > + { > + while (unlikely(busy)) > + mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); > + if ((rpt= free_list) != NULL) > + break; > mysql_cond_wait(&COND_rpl_thread_pool, &LOCK_rpl_thread_pool); > + } > free_list= rpt->next; > mysql_mutex_unlock(&LOCK_rpl_thread_pool); > mysql_mutex_lock(&rpt->LOCK_rpl_thread); > @@ -1908,7 +2124,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) > > e= (struct rpl_parallel_entry *)my_hash_element(&domain_hash, i); > mysql_mutex_lock(&e->LOCK_parallel_entry); > - e->need_sub_id_signal= true; > + ++e->need_sub_id_signal; > thd->ENTER_COND(&e->COND_parallel_entry, &e->LOCK_parallel_entry, > &stage_waiting_for_workers_idle, &old_stage); > while (e->current_sub_id > e->last_committed_sub_id) > @@ -1921,7 +2137,7 @@ rpl_parallel::wait_for_workers_idle(THD *thd) > } > mysql_cond_wait(&e->COND_parallel_entry, &e->LOCK_parallel_entry); > } > - e->need_sub_id_signal= false; > + --e->need_sub_id_signal; > thd->EXIT_COND(&old_stage); > if (err) > return err; > diff --git a/sql/rpl_parallel.h b/sql/rpl_parallel.h > index 04f8706..77a4005 100644 > --- a/sql/rpl_parallel.h > +++ b/sql/rpl_parallel.h > @@ -199,12 +199,18 @@ struct rpl_parallel_thread { > > > struct rpl_parallel_thread_pool { > - uint32 count; > struct rpl_parallel_thread **threads; > struct rpl_parallel_thread *free_list; > mysql_mutex_t LOCK_rpl_thread_pool; > mysql_cond_t COND_rpl_thread_pool; > + uint32 count; > bool inited; > + /* > + While FTWRL runs, this counter is incremented to make SQL thread or > + STOP/START slave not try to start new activity while that operation > + is in progress. > + */ > + bool busy; > > rpl_parallel_thread_pool(); > int init(uint32 size); > @@ -219,6 +225,12 @@ struct rpl_parallel_entry { > mysql_mutex_t LOCK_parallel_entry; > mysql_cond_t COND_parallel_entry; > uint32 domain_id; > + /* > + Incremented by wait_for_workers_idle() and rpl_pause_for_ftwrl() to show > + that they are waiting, so that finish_event_group knows to signal them > + when last_committed_sub_id is increased. > + */ > + uint32 need_sub_id_signal; > uint64 last_commit_id; > bool active; > /* > @@ -228,12 +240,6 @@ struct rpl_parallel_entry { > */ > bool force_abort; > /* > - Set in wait_for_workers_idle() to show that it is waiting, so that > - finish_event_group knows to signal it when last_committed_sub_id is > - increased. > - */ > - bool need_sub_id_signal; > - /* > At STOP SLAVE (force_abort=true), we do not want to process all events in > the queue (which could unnecessarily delay stop, if a lot of events happen > to be queued). The stop_count provides a safe point at which to stop, so > @@ -291,6 +297,11 @@ struct rpl_parallel_entry { > The value is ULONGLONG_MAX when no error occured. > */ > uint64 stop_on_error_sub_id; > + /* > + During FLUSH TABLES WITH READ LOCK, transactions with sub_id larger than > + this value must not start, but wait until the global read lock is > released. > + */ > + uint64 pause_sub_id; > /* Total count of event groups queued so far. */ > uint64 count_queued_event_groups; > /* > @@ -331,5 +342,7 @@ extern struct rpl_parallel_thread_pool > global_rpl_thread_pool; > extern int rpl_parallel_activate_pool(rpl_parallel_thread_pool *pool); > extern int rpl_parallel_inactivate_pool(rpl_parallel_thread_pool *pool); > extern bool process_gtid_for_restart_pos(Relay_log_info *rli, rpl_gtid > *gtid); > +extern int rpl_pause_for_ftwrl(THD *thd); > +extern void rpl_unpause_for_ftwrl(THD *thd); > > #endif /* RPL_PARALLEL_H */ > diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc > index 5635e9a..ca6b11e 100644 > --- a/sql/sql_parse.cc > +++ b/sql/sql_parse.cc > @@ -4271,6 +4271,17 @@ case SQLCOM_PREPARE: > break; > } > > + if (lex->type & REFRESH_READ_LOCK) > + { > + /* > + We need to pause any parallel replication slave workers during FLUSH > + TABLES WITH READ LOCK. Otherwise we might cause a deadlock, as > + worker threads eun run in arbitrary order but need to commit in a > + specific given order. > + */ > + if (rpl_pause_for_ftwrl(thd)) > + goto error; > + } > /* > reload_acl_and_cache() will tell us if we are allowed to write to the > binlog or not. > @@ -4301,6 +4312,8 @@ case SQLCOM_PREPARE: > if (!res) > my_ok(thd); > } > + if (lex->type & REFRESH_READ_LOCK) > + rpl_unpause_for_ftwrl(thd); > > break; > } > _______________________________________________ > commits mailing list > [email protected] > https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits _______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : [email protected] Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp

