how about also adding a mode = 'auto' which currently can be == transaction, but in future could be extended to based on statistics flip actual modes back and forth ?
/Jonas On Tue, Oct 7, 2014 at 4:20 PM, <[email protected]> wrote: > At http://bazaar.launchpad.net/~maria-captains/maria/10.0 > > ------------------------------------------------------------ > revno: 4391 > revision-id: [email protected] > parent: [email protected] > committer: Kristian Nielsen <[email protected]> > branch nick: work-10.0-mdev6676 > timestamp: Tue 2014-10-07 16:20:37 +0200 > message: > MDEV-6676: Speculative parallel replication. > > Better syntax for configuring the parallel mode. Now it is a CHANGE > MASTER > option instead of a system variable, which makes it possible to configure > differently for different multi-source replication masters. > > Now also the domain-based replication mode can be disabled. > === modified file 'mysql-test/include/check-testcase.test' > --- a/mysql-test/include/check-testcase.test 2013-12-16 12:02:21 +0000 > +++ b/mysql-test/include/check-testcase.test 2014-10-07 14:20:37 +0000 > @@ -64,6 +64,7 @@ if ($tmp) > --echo Master_SSL_Crlpath # > --echo Using_Gtid No > --echo Gtid_IO_Pos # > + --echo Parallel_Mode domain,groupcommit > } > if (!$tmp) { > # Note: after WL#5177, fields 13-18 shall not be filtered-out. > > === modified file 'mysql-test/r/mysqld--help.result' > --- a/mysql-test/r/mysqld--help.result 2014-09-19 13:25:37 +0000 > +++ b/mysql-test/r/mysqld--help.result 2014-10-07 14:20:37 +0000 > @@ -915,17 +915,6 @@ > parallel replication thread when reading ahead in the > relay log looking for opportunities for parallel > replication. Only used when --slave-parallel-threads > 0. > - --slave-parallel-mode=name > - Controls what transactions are applied in parallel when > - using --slave-parallel-threads. Syntax: > - slave_paralle_mode=value[,value...], where "value" could > - be one or more of: "domain", to apply different > - replication domains in parallel; "groupcommit", to apply > - in parallel transactions that group-committed together on > - the master; "transactional", to optimistically try to > - apply all transactional DML in parallel; and "waiting" to > - extend "transactional" to even transactions that had to > - wait on the master. > --slave-parallel-threads=# > If non-zero, number of threads to spawn to apply in > parallel events on the slave that were group-committed on > @@ -1320,7 +1309,6 @@ slave-exec-mode STRICT > slave-max-allowed-packet 1073741824 > slave-net-timeout 3600 > slave-parallel-max-queued 131072 > -slave-parallel-mode domain,groupcommit > slave-parallel-threads 0 > slave-skip-errors (No default value) > slave-sql-verify-checksum TRUE > > === modified file 'mysql-test/suite/rpl/r/rpl_parallel.result' > --- a/mysql-test/suite/rpl/r/rpl_parallel.result 2014-08-15 > 09:31:13 +0000 > +++ b/mysql-test/suite/rpl/r/rpl_parallel.result 2014-10-07 > 14:20:37 +0000 > @@ -1,5 +1,5 @@ > -include/rpl_init.inc [topology=1->2] > -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > +include/master-slave.inc > +[connection master] > SET GLOBAL slave_parallel_threads=10; > ERROR HY000: This operation cannot be performed as you have a running > slave ''; run STOP SLAVE '' first > include/stop_slave.inc > @@ -923,8 +923,79 @@ SELECT * FROM t2 WHERE a >= 30 ORDER BY > 32 > 33 > 34 > +*** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx *** > +Parallel_Mode = 'domain,groupcommit' > include/stop_slave.inc > -SET GLOBAL slave_parallel_threads=@old_parallel_threads; > +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); > +ERROR 42000: You have an error in your SQL syntax; check the manual that > corresponds to your MariaDB server version for the right syntax to use near > 'domain,groupcommit)' at line 1 > +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); > +ERROR HY000: Invalid use of 'transactional' option for PARALLEL_MODE in > CHANGE MASTER TO > +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); > +ERROR HY000: Invalid use of 'waiting' option for PARALLEL_MODE in CHANGE > MASTER TO > +CHANGE MASTER TO parallel_mode=(domain,domain); > +ERROR HY000: Invalid use of 'domain' option for PARALLEL_MODE in CHANGE > MASTER TO > +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); > +Parallel_Mode = 'domain,transactional,waiting' > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +Parallel_Mode = 'domain,groupcommit' > +*** MDEV-6676 - test that empty parallel_mode does not replicate in > parallel *** > +INSERT INTO t2 VALUES (40); > +include/save_master_gtid.inc > +CHANGE MASTER TO parallel_mode=(); > +SET @old_dbug= @@GLOBAL.debug_dbug; > +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; > +include/start_slave.inc > +include/sync_with_master_gtid.inc > +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; > +a > +40 > +include/stop_slave.inc > +SET GLOBAL debug_dbug=@old_dbug; > +*** MDEV-6676 - test disabling domain-based parallel replication *** > +SET gtid_domain_id = 1; > +INSERT INTO t2 VALUES (41); > +INSERT INTO t2 VALUES (42); > +INSERT INTO t2 VALUES (43); > +INSERT INTO t2 VALUES (44); > +INSERT INTO t2 VALUES (45); > +INSERT INTO t2 VALUES (46); > +DELETE FROM t2 WHERE a >= 41; > +SET gtid_domain_id = 2; > +INSERT INTO t2 VALUES (41); > +INSERT INTO t2 VALUES (42); > +INSERT INTO t2 VALUES (43); > +INSERT INTO t2 VALUES (44); > +INSERT INTO t2 VALUES (45); > +INSERT INTO t2 VALUES (46); > +SET gtid_domain_id = 0; > +include/save_master_gtid.inc > +CHANGE MASTER TO parallel_mode=(groupcommit); > +include/start_slave.inc > +include/sync_with_master_gtid.inc > +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; > +a > +40 > +41 > +42 > +43 > +44 > +45 > +46 > +include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +include/start_slave.inc > +*** MDEV-6676 - test that parallel mode is saved correctly in master.info > across server restart *** > +Parallel_Mode = 'domain,groupcommit' > +include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(transactional,waiting); > +Parallel_Mode = 'transactional,waiting' > +include/start_slave.inc > +include/rpl_restart_server.inc [server_number=2] > +Parallel_Mode = 'transactional,waiting' > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +include/start_slave.inc > +include/stop_slave.inc > +SET GLOBAL slave_parallel_threads=0; > include/start_slave.inc > SET DEBUG_SYNC= 'RESET'; > DROP function foo; > > === modified file 'mysql-test/suite/rpl/r/rpl_parallel_optimistic.result' > --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-09-24 > 19:25:10 +0000 > +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic.result 2014-10-07 > 14:20:37 +0000 > @@ -2,11 +2,9 @@ include/rpl_init.inc [topology=1->2] > ALTER TABLE mysql.gtid_slave_pos ENGINE=InnoDB; > CREATE TABLE t1 (a int PRIMARY KEY, b INT) ENGINE=InnoDB; > SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > -SET @old_mode=@@GLOBAL.slave_parallel_mode; > include/stop_slave.inc > SET GLOBAL slave_parallel_threads=10; > -SET GLOBAL slave_parallel_mode="domain,transactional"; > -CHANGE MASTER TO master_use_gtid=slave_pos; > +CHANGE MASTER TO master_use_gtid=slave_pos, > parallel_mode=(domain,transactional); > INSERT INTO t1 VALUES(1,1); > BEGIN; > INSERT INTO t1 VALUES(2,1); > @@ -288,8 +286,8 @@ SET GLOBAL binlog_format= @old_format; > SET GLOBAL tx_isolation= @old_isolation; > include/start_slave.inc > include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > SET GLOBAL slave_parallel_threads=@old_parallel_threads; > -SET GLOBAL slave_parallel_mode=@old_mode; > include/start_slave.inc > DROP TABLE t1, t2, t3; > include/rpl_end.inc > > === modified file > 'mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result' > --- a/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result > 2014-09-24 19:25:10 +0000 > +++ b/mysql-test/suite/rpl/r/rpl_parallel_optimistic_nobinlog.result > 2014-10-07 14:20:37 +0000 > @@ -6,12 +6,10 @@ INSERT INTO t1 VALUES (1,0), (2,0), (3,0 > INSERT INTO t2 VALUES (1,0), (2,0); > SET @old_isolation= @@GLOBAL.tx_isolation; > SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > -SET @old_mode=@@GLOBAL.slave_parallel_mode; > include/stop_slave.inc > SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; > SET GLOBAL slave_parallel_threads=10; > -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; > -CHANGE MASTER TO master_use_gtid=slave_pos; > +CHANGE MASTER TO master_use_gtid=slave_pos, > parallel_mode=(domain,transactional,waiting); > *** Test that we replicate correctly when using READ COMMITTED and > --log-slave-updates=0 on the slave *** > INSERT INTO t1 SELECT 4, COUNT(*) FROM t2; > INSERT INTO t2 SELECT 4, COUNT(*) FROM t1; > @@ -78,8 +76,8 @@ a b > 10 10 > include/stop_slave.inc > SET GLOBAL tx_isolation= @old_isolation; > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > SET GLOBAL slave_parallel_threads=@old_parallel_threads; > -SET GLOBAL slave_parallel_mode=@old_mode; > include/start_slave.inc > DROP TABLE t1, t2; > include/rpl_end.inc > > === modified file 'mysql-test/suite/rpl/t/rpl_parallel.test' > --- a/mysql-test/suite/rpl/t/rpl_parallel.test 2014-08-15 09:31:13 +0000 > +++ b/mysql-test/suite/rpl/t/rpl_parallel.test 2014-10-07 14:20:37 +0000 > @@ -1,13 +1,12 @@ > --source include/have_innodb.inc > --source include/have_debug.inc > --source include/have_debug_sync.inc > ---let $rpl_topology=1->2 > ---source include/rpl_init.inc > +--source include/master-slave.inc > > # Test various aspects of parallel replication. > > --connection server_2 > -SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > +--let $old_parallel_threads=`SELECT @@GLOBAL.slave_parallel_threads` > --error ER_SLAVE_MUST_STOP > SET GLOBAL slave_parallel_threads=10; > --source include/stop_slave.inc > @@ -1466,9 +1465,100 @@ SET sql_slave_skip_counter= 1; > SELECT * FROM t2 WHERE a >= 30 ORDER BY a; > > > +--echo *** MDEV-6676 - test syntax of CHANGE MASTER TO PARALLEL_MODE=xxx > *** > +--connection server_2 > + > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > +--source include/stop_slave.inc > +--error ER_PARSE_ERROR > +CHANGE MASTER TO parallel_mode=(,domain,groupcommit); > +--error ER_INVALID_SLAVE_PARALLEL_MODE > +CHANGE MASTER TO parallel_mode=(domain,groupcommit,transactional); > +--error ER_INVALID_SLAVE_PARALLEL_MODE > +CHANGE MASTER TO parallel_mode=(waiting,domain,transactional,waiting); > +--error ER_INVALID_SLAVE_PARALLEL_MODE > +CHANGE MASTER TO parallel_mode=(domain,domain); > +CHANGE MASTER TO parallel_mode=(waiting,transactional,domain); > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > + > + > +--echo *** MDEV-6676 - test that empty parallel_mode does not replicate > in parallel *** > +--connection server_1 > +INSERT INTO t2 VALUES (40); > +--source include/save_master_gtid.inc > + > +--connection server_2 > +CHANGE MASTER TO parallel_mode=(); > +# Test that we do not use parallel apply, by injecting an unconditional > +# crash in the parallel apply code. > +SET @old_dbug= @@GLOBAL.debug_dbug; > +SET GLOBAL debug_dbug="+d,slave_crash_if_parallel_apply"; > +--source include/start_slave.inc > +--source include/sync_with_master_gtid.inc > +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; > +--source include/stop_slave.inc > +SET GLOBAL debug_dbug=@old_dbug; > + > + > +--echo *** MDEV-6676 - test disabling domain-based parallel replication > *** > +--connection server_1 > +# Let's do a bunch of transactions that will conflict if run out-of-order > in > +# domain-based parallel replication mode. > +SET gtid_domain_id = 1; > +INSERT INTO t2 VALUES (41); > +INSERT INTO t2 VALUES (42); > +INSERT INTO t2 VALUES (43); > +INSERT INTO t2 VALUES (44); > +INSERT INTO t2 VALUES (45); > +INSERT INTO t2 VALUES (46); > +DELETE FROM t2 WHERE a >= 41; > +SET gtid_domain_id = 2; > +INSERT INTO t2 VALUES (41); > +INSERT INTO t2 VALUES (42); > +INSERT INTO t2 VALUES (43); > +INSERT INTO t2 VALUES (44); > +INSERT INTO t2 VALUES (45); > +INSERT INTO t2 VALUES (46); > +SET gtid_domain_id = 0; > +--source include/save_master_gtid.inc > +--connection server_2 > +CHANGE MASTER TO parallel_mode=(groupcommit); > +--source include/start_slave.inc > +--source include/sync_with_master_gtid.inc > +SELECT * FROM t2 WHERE a >= 40 ORDER BY a; > +--source include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +--source include/start_slave.inc > + > + > +--echo *** MDEV-6676 - test that parallel mode is saved correctly in > master.info across server restart *** > +--connection server_2 > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > +--source include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(transactional,waiting); > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > +--source include/start_slave.inc > + > +--let $rpl_server_number= 2 > +--source include/rpl_restart_server.inc > + > +--connection server_2 > +--let $status_items= Parallel_Mode > +--source include/show_slave_status.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > +--source include/start_slave.inc > + > + > --connection server_2 > --source include/stop_slave.inc > -SET GLOBAL slave_parallel_threads=@old_parallel_threads; > +eval SET GLOBAL slave_parallel_threads=$old_parallel_threads; > --source include/start_slave.inc > SET DEBUG_SYNC= 'RESET'; > > > === modified file 'mysql-test/suite/rpl/t/rpl_parallel_optimistic.test' > --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-09-24 > 19:25:10 +0000 > +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic.test 2014-10-07 > 14:20:37 +0000 > @@ -11,11 +11,9 @@ CREATE TABLE t1 (a int PRIMARY KEY, b IN > --connection server_2 > --sync_with_master > SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > -SET @old_mode=@@GLOBAL.slave_parallel_mode; > --source include/stop_slave.inc > SET GLOBAL slave_parallel_threads=10; > -SET GLOBAL slave_parallel_mode="domain,transactional"; > -CHANGE MASTER TO master_use_gtid=slave_pos; > +CHANGE MASTER TO master_use_gtid=slave_pos, > parallel_mode=(domain,transactional); > > > --connection server_1 > @@ -309,8 +307,8 @@ SET GLOBAL tx_isolation= @old_isolation; > > --connection server_2 > --source include/stop_slave.inc > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > SET GLOBAL slave_parallel_threads=@old_parallel_threads; > -SET GLOBAL slave_parallel_mode=@old_mode; > --source include/start_slave.inc > > --connection server_1 > > === modified file > 'mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test' > --- a/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test > 2014-09-24 19:25:10 +0000 > +++ b/mysql-test/suite/rpl/t/rpl_parallel_optimistic_nobinlog.test > 2014-10-07 14:20:37 +0000 > @@ -16,12 +16,10 @@ INSERT INTO t2 VALUES (1,0), (2,0); > --sync_with_master > SET @old_isolation= @@GLOBAL.tx_isolation; > SET @old_parallel_threads=@@GLOBAL.slave_parallel_threads; > -SET @old_mode=@@GLOBAL.slave_parallel_mode; > --source include/stop_slave.inc > SET GLOBAL TRANSACTION ISOLATION LEVEL READ COMMITTED; > SET GLOBAL slave_parallel_threads=10; > -SET GLOBAL slave_parallel_mode="domain,transactional,waiting"; > -CHANGE MASTER TO master_use_gtid=slave_pos; > +CHANGE MASTER TO master_use_gtid=slave_pos, > parallel_mode=(domain,transactional,waiting); > > > --echo *** Test that we replicate correctly when using READ COMMITTED and > --log-slave-updates=0 on the slave *** > @@ -63,8 +61,8 @@ SELECT * FROM t2 ORDER BY a; > --connection server_2 > --source include/stop_slave.inc > SET GLOBAL tx_isolation= @old_isolation; > +CHANGE MASTER TO parallel_mode=(domain,groupcommit); > SET GLOBAL slave_parallel_threads=@old_parallel_threads; > -SET GLOBAL slave_parallel_mode=@old_mode; > --source include/start_slave.inc > > --connection server_1 > > === modified file 'sql/lex.h' > --- a/sql/lex.h 2014-02-02 09:00:36 +0000 > +++ b/sql/lex.h 2014-10-07 14:20:37 +0000 > @@ -194,6 +194,7 @@ static SYMBOL symbols[] = { > { "DISTINCTROW", SYM(DISTINCT)}, /* Access likes this */ > { "DIV", SYM(DIV_SYM)}, > { "DO", SYM(DO_SYM)}, > + { "DOMAIN", SYM(DOMAIN_SYM)}, > { "DOUBLE", SYM(DOUBLE_SYM)}, > { "DROP", SYM(DROP)}, > { "DUAL", SYM(DUAL_SYM)}, > @@ -257,6 +258,7 @@ static SYMBOL symbols[] = { > { "GRANT", SYM(GRANT)}, > { "GRANTS", SYM(GRANTS)}, > { "GROUP", SYM(GROUP_SYM)}, > + { "GROUPCOMMIT", SYM(GROUPCOMMIT_SYM)}, > { "HANDLER", SYM(HANDLER_SYM)}, > { "HARD", SYM(HARD_SYM)}, > { "HASH", SYM(HASH_SYM)}, > @@ -429,6 +431,7 @@ static SYMBOL symbols[] = { > { "PACK_KEYS", SYM(PACK_KEYS_SYM)}, > { "PAGE", SYM(PAGE_SYM)}, > { "PAGE_CHECKSUM", SYM(PAGE_CHECKSUM_SYM)}, > + { "PARALLEL_MODE", SYM(PARALLEL_MODE_SYM)}, > { "PARSER", SYM(PARSER_SYM)}, > { "PARSE_VCOL_EXPR", SYM(PARSE_VCOL_EXPR_SYM)}, > { "PARTIAL", SYM(PARTIAL)}, > @@ -650,6 +653,7 @@ static SYMBOL symbols[] = { > { "VIEW", SYM(VIEW_SYM)}, > { "VIRTUAL", SYM(VIRTUAL_SYM)}, > { "WAIT", SYM(WAIT_SYM)}, > + { "WAITING", SYM(WAITING_SYM)}, > { "WARNINGS", SYM(WARNINGS)}, > { "WEEK", SYM(WEEK_SYM)}, > { "WEIGHT_STRING", SYM(WEIGHT_STRING_SYM)}, > > === modified file 'sql/log_event.cc' > --- a/sql/log_event.cc 2014-09-25 13:47:58 +0000 > +++ b/sql/log_event.cc 2014-10-07 14:20:37 +0000 > @@ -12728,7 +12728,7 @@ bool rpl_get_position_info(const char ** > return FALSE; > #else > const Relay_log_info *rli= &(active_mi->rli); > - if (opt_slave_parallel_threads == 0) > + if (!rli->mi->using_parallel()) > { > *log_file_name= rli->group_master_log_name; > *log_pos= rli->group_master_log_pos + > > === modified file 'sql/mysqld.cc' > --- a/sql/mysqld.cc 2014-09-26 10:37:19 +0000 > +++ b/sql/mysqld.cc 2014-10-07 14:20:37 +0000 > @@ -562,8 +562,6 @@ ulong stored_program_cache_size= 0; > > ulong opt_slave_parallel_threads= 0; > ulong opt_slave_domain_parallel_threads= 0; > -ulonglong opt_slave_parallel_mode= > - SLAVE_PARALLEL_DOMAIN | SLAVE_PARALLEL_GROUPCOMMIT; > ulong opt_binlog_commit_wait_count= 0; > ulong opt_binlog_commit_wait_usec= 0; > ulong opt_slave_parallel_max_queued= 131072; > > === modified file 'sql/mysqld.h' > --- a/sql/mysqld.h 2014-09-26 10:37:19 +0000 > +++ b/sql/mysqld.h 2014-10-07 14:20:37 +0000 > @@ -187,7 +187,6 @@ extern ulong stored_program_cache_size; > extern ulong opt_slave_parallel_threads; > extern ulong opt_slave_domain_parallel_threads; > extern ulong opt_slave_parallel_max_queued; > -extern ulonglong opt_slave_parallel_mode; > extern ulong opt_binlog_commit_wait_count; > extern ulong opt_binlog_commit_wait_usec; > extern my_bool opt_gtid_ignore_duplicates; > > === modified file 'sql/rpl_mi.cc' > --- a/sql/rpl_mi.cc 2014-09-02 12:07:01 +0000 > +++ b/sql/rpl_mi.cc 2014-10-07 14:20:37 +0000 > @@ -40,7 +40,8 @@ Master_info::Master_info(LEX_STRING *con > heartbeat_period(0), received_heartbeats(0), master_id(0), > prev_master_id(0), > using_gtid(USE_GTID_NO), events_queued_since_last_gtid(0), > - gtid_reconnect_event_skip_count(0), gtid_event_seen(false) > + gtid_reconnect_event_skip_count(0), gtid_event_seen(false), > + parallel_mode(SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT) > { > host[0] = 0; user[0] = 0; password[0] = 0; > ssl_ca[0]= 0; ssl_capath[0]= 0; ssl_cert[0]= 0; > @@ -178,6 +179,7 @@ void init_master_log_pos(Master_info* mi > mi->events_queued_since_last_gtid= 0; > mi->gtid_reconnect_event_skip_count= 0; > mi->gtid_event_seen= false; > + mi->parallel_mode= SLAVE_PARALLEL_DOMAIN|SLAVE_PARALLEL_GROUPCOMMIT; > > /* Intentionally init ssl_verify_server_cert to 0, no option available > */ > mi->ssl_verify_server_cert= 0; > @@ -514,6 +516,22 @@ file '%s')", fname); > else > mi->using_gtid= Master_info::USE_GTID_NO; > } > + else if (!strncmp(buf, STRING_WITH_LEN("parallel_mode="))) > + { > + int val= atoi(buf + (sizeof("parallel_mode=") - 1)); > + mi->parallel_mode= val & (SLAVE_PARALLEL_DOMAIN | > + SLAVE_PARALLEL_GROUPCOMMIT | > + SLAVE_PARALLEL_TRX | > + SLAVE_PARALLEL_WAITING); > + } > + else if (!strncmp(buf, STRING_WITH_LEN("END_MARKER"))) > + { > + /* > + Guard agaist extra left-overs at the end of file, in case a > later > + update causes the file to shrink compared to earlier > contents. > + */ > + break; > + } > } > } > } > @@ -657,7 +675,9 @@ int flush_master_info(Master_info* mi, > my_b_printf(file, > > > "%u\n%s\n%s\n%s\n%s\n%s\n%d\n%d\n%d\n%s\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n%s\n%s\n%d\n%s\n%s\n" > "\n\n\n\n\n\n\n\n\n\n\n" > - "using_gtid=%d\n", > + "using_gtid=%d\n" > + "parallel_mode=%u\n" > + "END_MARKER\n", > LINES_IN_MASTER_INFO, > mi->master_log_name, llstr(mi->master_log_pos, lbuf), > mi->host, mi->user, > @@ -666,7 +686,7 @@ int flush_master_info(Master_info* mi, > mi->ssl_cipher, mi->ssl_key, mi->ssl_verify_server_cert, > heartbeat_buf, "", ignore_server_ids_buf, > "", 0, > - mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid); > + mi->ssl_crl, mi->ssl_crlpath, mi->using_gtid, > mi->parallel_mode); > my_free(ignore_server_ids_buf); > err= flush_io_cache(file); > if (sync_masterinfo_period && !err && > > === modified file 'sql/rpl_mi.h' > --- a/sql/rpl_mi.h 2014-09-02 12:07:01 +0000 > +++ b/sql/rpl_mi.h 2014-10-07 14:20:37 +0000 > @@ -75,6 +75,10 @@ class Master_info : public Slave_reporti > return connection_name.str == 0; > } > static const char *using_gtid_astext(enum enum_using_gtid arg); > + bool using_parallel() > + { > + return opt_slave_parallel_threads > 0 && parallel_mode != 0; > + } > > /* the variables below are needed because we can change masters on the > fly */ > char master_log_name[FN_REFLEN+6]; /* Room for multi-*/ > @@ -178,6 +182,12 @@ class Master_info : public Slave_reporti > uint64 gtid_reconnect_event_skip_count; > /* gtid_event_seen is false until we receive first GTID event from > master. */ > bool gtid_event_seen; > + /* > + The parallel replication modes, if any. A combination (binary OR) of > any of > + SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, > SLAVE_PARALLEL_TRX, and > + SLAVE_PARALLEL_WAITING. > + */ > + uint32 parallel_mode; > }; > int init_master_info(Master_info* mi, const char* master_info_fname, > const char* slave_info_fname, > > === modified file 'sql/rpl_parallel.cc' > --- a/sql/rpl_parallel.cc 2014-10-03 09:43:23 +0000 > +++ b/sql/rpl_parallel.cc 2014-10-07 14:20:37 +0000 > @@ -1891,6 +1891,7 @@ rpl_parallel::do_event(rpl_group_info *s > bool did_enter_cond= false; > PSI_stage_info old_stage; > > + DBUG_EXECUTE_IF("slave_crash_if_parallel_apply", DBUG_SUICIDE();); > /* Handle master log name change, seen in Rotate_log_event. */ > typ= ev->get_type_code(); > if (unlikely(typ == ROTATE_EVENT)) > @@ -1971,7 +1972,8 @@ rpl_parallel::do_event(rpl_group_info *s > if (typ == GTID_EVENT) > { > Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); > - uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO ? > + uint32 domain_id= (rli->mi->using_gtid == Master_info::USE_GTID_NO || > + !(rli->mi->parallel_mode & SLAVE_PARALLEL_DOMAIN) ? > 0 : gtid_ev->domain_id); > if (!(e= find(domain_id))) > { > @@ -2012,7 +2014,7 @@ rpl_parallel::do_event(rpl_group_info *s > { > Gtid_log_event *gtid_ev= static_cast<Gtid_log_event *>(ev); > bool new_gco; > - ulonglong mode= opt_slave_parallel_mode; > + ulonglong mode= rli->mi->parallel_mode; > uchar gtid_flags= gtid_ev->flags2; > group_commit_orderer *gco; > uint8 force_switch_flag; > > === modified file 'sql/rpl_parallel.h' > --- a/sql/rpl_parallel.h 2014-10-03 09:43:23 +0000 > +++ b/sql/rpl_parallel.h 2014-10-07 14:20:37 +0000 > @@ -12,7 +12,11 @@ class Relay_log_info; > struct inuse_relaylog; > > > -/* Bit masks for the values in --slave-parallel-mode. */ > +/* > + Bit masks for the values in --slave-parallel-mode. > + Note that these values cannot be changed - they are stored in > master.info, > + so need to be possible to read back in a different version of the > server. > +*/ > #define SLAVE_PARALLEL_DOMAIN (1ULL << 0) > #define SLAVE_PARALLEL_GROUPCOMMIT (1ULL << 1) > #define SLAVE_PARALLEL_TRX (1ULL << 2) > > === modified file 'sql/share/errmsg-utf8.txt' > --- a/sql/share/errmsg-utf8.txt 2014-07-08 17:38:26 +0000 > +++ b/sql/share/errmsg-utf8.txt 2014-10-07 14:20:37 +0000 > @@ -7111,3 +7111,5 @@ ER_SLAVE_SKIP_NOT_IN_GTID > eng "When using GTID, @@sql_slave_skip_counter can not be used. > Instead, setting @@gtid_slave_pos explicitly can be used to skip to after a > given GTID position." > ER_TABLE_DEFINITION_TOO_BIG > eng "The definition for table %`s is too big" > +ER_INVALID_SLAVE_PARALLEL_MODE > + eng "Invalid use of '%s' option for PARALLEL_MODE in CHANGE > MASTER TO" > > === modified file 'sql/slave.cc' > --- a/sql/slave.cc 2014-09-02 12:07:01 +0000 > +++ b/sql/slave.cc 2014-10-07 14:20:37 +0000 > @@ -625,8 +625,7 @@ int terminate_slave_threads(Master_info* > if (thread_mask & (SLAVE_SQL|SLAVE_FORCE_ALL)) > { > DBUG_PRINT("info",("Terminating SQL thread")); > - if (opt_slave_parallel_threads > 0 && > - mi->rli.abort_slave && mi->rli.stop_for_until) > + if (mi->using_parallel() && mi->rli.abort_slave && > mi->rli.stop_for_until) > { > mi->rli.stop_for_until= false; > mi->rli.parallel.stop_during_until(); > @@ -2576,6 +2575,8 @@ static bool send_show_master_info_header > field_list.push_back(new Item_empty_string("Using_Gtid", > sizeof("Current_Pos")-1)); > field_list.push_back(new Item_empty_string("Gtid_IO_Pos", 30)); > + field_list.push_back(new Item_empty_string("Parallel_Mode", > + sizeof("domain,groupcommit,transactional,waiting")-1)); > if (full) > { > field_list.push_back(new Item_return_int("Retried_transactions", > @@ -2708,8 +2709,7 @@ static bool send_show_master_info_data(T > else > { > idle= mi->rli.sql_thread_caught_up; > - if (opt_slave_parallel_threads > 0 && idle && > - !mi->rli.parallel.workers_idle()) > + if (mi->using_parallel() && idle && > !mi->rli.parallel.workers_idle()) > idle= false; > } > if (idle) > @@ -2786,13 +2786,34 @@ static bool send_show_master_info_data(T > protocol->store(mi->ssl_ca, &my_charset_bin); > // Master_Ssl_Crlpath > protocol->store(mi->ssl_capath, &my_charset_bin); > + // Using_Gtid > protocol->store(mi->using_gtid_astext(mi->using_gtid), > &my_charset_bin); > + // Gtid_IO_Pos > { > char buff[30]; > String tmp(buff, sizeof(buff), system_charset_info); > mi->gtid_current_pos.to_string(&tmp); > protocol->store(tmp.ptr(), tmp.length(), &my_charset_bin); > } > + // Parallel_Mode > + { > + /* Note how sizeof("domain") has room for "domain," due to traling > 0. */ > + char buf[sizeof("domain") + sizeof("groupcommit") + > + sizeof("transactional") + sizeof("waiting") + 1]; > + char *p= buf; > + uint32 mode= mi->parallel_mode; > + if (mode & SLAVE_PARALLEL_DOMAIN) > + p= strmov(p, "domain,"); > + if (mode & SLAVE_PARALLEL_GROUPCOMMIT) > + p= strmov(p, "groupcommit,"); > + if (mode & SLAVE_PARALLEL_TRX) > + p= strmov(p, "transactional,"); > + if (mode & SLAVE_PARALLEL_WAITING) > + p= strmov(p, "waiting,"); > + if (p != buf) > + --p; // Discard last ',' > + protocol->store(buf, p-buf, &my_charset_bin); > + } > if (full) > { > protocol->store((uint32) mi->rli.retried_trans); > @@ -3521,7 +3542,7 @@ static int exec_relay_log_event(THD* thd > > update_state_of_relay_log(rli, ev); > > - if (opt_slave_parallel_threads > 0) > + if (rli->mi->using_parallel()) > { > int res= rli->parallel.do_event(serial_rgi, ev, event_size); > if (res >= 0) > @@ -4667,7 +4688,7 @@ log '%s' at position %s, relay log '%s' > } > } > > - if (opt_slave_parallel_threads > 0) > + if (mi->using_parallel()) > rli->parallel.wait_for_done(thd, rli); > > /* Thread stopped. Print the current replication position to the log */ > @@ -4693,7 +4714,7 @@ log '%s' at position %s, relay log '%s' > (We want the first one to be before the printout of stop position to > get the correct position printed.) > */ > - if (opt_slave_parallel_threads > 0) > + if (mi->using_parallel()) > rli->parallel.wait_for_done(thd, rli); > > /* > @@ -6314,7 +6335,7 @@ static Log_event* next_event(rpl_group_i > llstr(my_b_tell(cur_log),llbuf1), > llstr(rli->event_relay_log_pos,llbuf2))); > DBUG_ASSERT(my_b_tell(cur_log) >= BIN_LOG_HEADER_SIZE); > - DBUG_ASSERT(opt_slave_parallel_threads > 0 || > + DBUG_ASSERT(rli->mi->using_parallel() || > my_b_tell(cur_log) == rli->event_relay_log_pos); > } > #endif > > === modified file 'sql/sql_lex.h' > --- a/sql/sql_lex.h 2014-08-07 16:06:56 +0000 > +++ b/sql/sql_lex.h 2014-10-07 14:20:37 +0000 > @@ -218,11 +218,18 @@ struct LEX_MASTER_INFO > uint port, connect_retry; > float heartbeat_period; > /* > + Modes of parallel replication enabled, if any. A combination (binary > OR) of > + any of SLAVE_PARALLEL_DOMAIN, SLAVE_PARALLEL_GROUPCOMMIT, > + SLAVE_PARALLEL_TRX, and SLAVE_PARALLEL_WAITING. > + */ > + uint32 repl_parallel_mode; > + /* > Enum is used for making it possible to detect if the user > changed variable or if it should be left at old value > */ > enum {LEX_MI_UNCHANGED, LEX_MI_DISABLE, LEX_MI_ENABLE} > - ssl, ssl_verify_server_cert, heartbeat_opt, > repl_ignore_server_ids_opt; > + ssl, ssl_verify_server_cert, heartbeat_opt, > repl_ignore_server_ids_opt, > + repl_parallel_mode_opt; > enum { > LEX_GTID_UNCHANGED, LEX_GTID_NO, LEX_GTID_CURRENT_POS, > LEX_GTID_SLAVE_POS > } use_gtid_opt; > @@ -241,10 +248,11 @@ struct LEX_MASTER_INFO > pos= relay_log_pos= server_id= port= connect_retry= 0; > heartbeat_period= 0; > ssl= ssl_verify_server_cert= heartbeat_opt= > - repl_ignore_server_ids_opt= LEX_MI_UNCHANGED; > + repl_ignore_server_ids_opt= repl_parallel_mode_opt= > LEX_MI_UNCHANGED; > gtid_pos_str.length= 0; > gtid_pos_str.str= NULL; > use_gtid_opt= LEX_GTID_UNCHANGED; > + repl_parallel_mode= 0; > } > }; > > > === modified file 'sql/sql_repl.cc' > --- a/sql/sql_repl.cc 2014-08-07 16:06:56 +0000 > +++ b/sql/sql_repl.cc 2014-10-07 14:20:37 +0000 > @@ -3434,6 +3434,9 @@ bool change_master(THD* thd, Master_info > lex_mi->relay_log_name || lex_mi->relay_log_pos) > mi->using_gtid= Master_info::USE_GTID_NO; > > + if (lex_mi->repl_parallel_mode_opt != LEX_MASTER_INFO::LEX_MI_UNCHANGED) > + mi->parallel_mode= lex_mi->repl_parallel_mode; > + > /* > If user did specify neither host nor port nor any log name nor any log > pos, i.e. he specified only user/password/master_connect_retry, he > probably > > === modified file 'sql/sql_yacc.yy' > --- a/sql/sql_yacc.yy 2014-09-03 14:24:31 +0000 > +++ b/sql/sql_yacc.yy 2014-10-07 14:20:37 +0000 > @@ -1130,6 +1130,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** > %token DIV_SYM > %token DOUBLE_SYM /* SQL-2003-R */ > %token DO_SYM > +%token DOMAIN_SYM > %token DROP /* SQL-2003-R */ > %token DUAL_SYM > %token DUMPFILE > @@ -1194,6 +1195,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** > %token GRANT /* SQL-2003-R */ > %token GRANTS > %token GROUP_SYM /* SQL-2003-R */ > +%token GROUPCOMMIT_SYM > %token GROUP_CONCAT_SYM > %token GT_SYM /* OPERATOR */ > %token HANDLER_SYM > @@ -1376,6 +1378,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** > %token PACK_KEYS_SYM > %token PAGE_SYM > %token PAGE_CHECKSUM_SYM > +%token PARALLEL_MODE_SYM > %token PARAM_MARKER > %token PARSER_SYM > %token PARSE_VCOL_EXPR_SYM > @@ -1604,6 +1607,7 @@ bool my_yyoverflow(short **a, YYSTYPE ** > %token VIEW_SYM /* SQL-2003-N */ > %token VIRTUAL_SYM > %token WAIT_SYM > +%token WAITING_SYM > %token WARNINGS > %token WEEK_SYM > %token WEIGHT_STRING_SYM > @@ -2244,6 +2248,14 @@ rule: <-- starts at col 1 > { > Lex->mi.repl_ignore_server_ids_opt= > LEX_MASTER_INFO::LEX_MI_ENABLE; > } > + | PARALLEL_MODE_SYM EQ '(' ')' > + { > + Lex->mi.repl_parallel_mode_opt= > LEX_MASTER_INFO::LEX_MI_DISABLE; > + } > + | PARALLEL_MODE_SYM EQ '(' parallel_mode_list ')' > + { > + Lex->mi.repl_parallel_mode_opt= > LEX_MASTER_INFO::LEX_MI_ENABLE; > + } > | > master_file_def > ; > @@ -2260,6 +2272,52 @@ rule: <-- starts at col 1 > insert_dynamic(&Lex->mi.repl_ignore_server_ids, (uchar*) > &($1)); > } > > +parallel_mode_list: > + parallel_mode_option > + | parallel_mode_list ',' parallel_mode_option > + ; > + > +parallel_mode_option: > + DOMAIN_SYM > + { > + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_DOMAIN) > + { > + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), > "domain"); > + MYSQL_YYABORT; > + } > + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_DOMAIN; > + } > + | GROUPCOMMIT_SYM > + { > + if (Lex->mi.repl_parallel_mode & > + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) > + { > + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), > "groupcommit"); > + MYSQL_YYABORT; > + } > + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_GROUPCOMMIT; > + } > + | TRANSACTIONAL_SYM > + { > + if (Lex->mi.repl_parallel_mode & > + (SLAVE_PARALLEL_GROUPCOMMIT|SLAVE_PARALLEL_TRX)) > + { > + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), > "transactional"); > + MYSQL_YYABORT; > + } > + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_TRX; > + } > + | WAITING_SYM > + { > + if (Lex->mi.repl_parallel_mode & SLAVE_PARALLEL_WAITING) > + { > + my_error(ER_INVALID_SLAVE_PARALLEL_MODE, MYF(0), > "waiting"); > + MYSQL_YYABORT; > + } > + Lex->mi.repl_parallel_mode|= SLAVE_PARALLEL_WAITING; > + } > + > + > master_file_def: > MASTER_LOG_FILE_SYM EQ TEXT_STRING_sys > { > @@ -14098,6 +14156,7 @@ user: user_maybe_role > | DISABLE_SYM {} > | DISCARD {} > | DISK_SYM {} > + | DOMAIN_SYM {} > | DUMPFILE {} > | DUPLICATE_SYM {} > | DYNAMIC_SYM {} > @@ -14131,6 +14190,7 @@ user: user_maybe_role > | GET_FORMAT {} > | GRANTS {} > | GLOBAL_SYM {} > + | GROUPCOMMIT_SYM {} > | HASH_SYM {} > | HARD_SYM {} > | HOSTS_SYM {} > @@ -14221,6 +14281,7 @@ user: user_maybe_role > | ONLY_SYM {} > | PACK_KEYS_SYM {} > | PAGE_SYM {} > + | PARALLEL_MODE_SYM {} > | PARTIAL {} > | PARTITIONING_SYM {} > | PARTITIONS_SYM {} > @@ -14336,6 +14397,7 @@ user: user_maybe_role > | VALUE_SYM {} > | WARNINGS {} > | WAIT_SYM {} > + | WAITING_SYM {} > | WEEK_SYM {} > | WEIGHT_STRING_SYM {} > | WORK_SYM {} > > === modified file 'sql/sys_vars.cc' > --- a/sql/sys_vars.cc 2014-09-19 13:25:37 +0000 > +++ b/sql/sys_vars.cc 2014-10-07 14:20:37 +0000 > @@ -1834,31 +1834,11 @@ static Sys_var_ulong Sys_slave_parallel_ > VALID_RANGE(0,2147483647), DEFAULT(131072), BLOCK_SIZE(1)); > > > -static const char *slave_parallel_mode_names[] = { > - "domain", "groupcommit", "transactional", "waiting", NULL > -}; > - > -static Sys_var_set Sys_slave_parallel_mode( > - "slave_parallel_mode", > - "Controls what transactions are applied in parallel when using " > - "--slave-parallel-threads. Syntax: > slave_paralle_mode=value[,value...], " > - "where \"value\" could be one or more of: \"domain\", to apply " > - "different replication domains in parallel; \"groupcommit\", to > apply " > - "in parallel transactions that group-committed together on the > master; " > - "\"transactional\", to optimistically try to apply all > transactional " > - "DML in parallel; and \"waiting\" to extend \"transactional\" to " > - "even transactions that had to wait on the master.", > - GLOBAL_VAR(opt_slave_parallel_mode), CMD_LINE(REQUIRED_ARG), > - slave_parallel_mode_names, > - DEFAULT(SLAVE_PARALLEL_DOMAIN | > - SLAVE_PARALLEL_GROUPCOMMIT)); > - > - > static Sys_var_bit Sys_replicate_allow_parallel( > "replicate_allow_parallel", > "If set when a transaction is written to the binlog, that > transaction " > - "is allowed to replicate in parallel on a slave where " > - "slave_parallel_mode is set to \"transactional\". Can be cleared > for " > + "is allowed to replicate in parallel on a slave where > parallel_mode " > + "is set to \"transactional\" (in CHANGE MASTER). Can be cleared > for " > "transactions that are likely to cause a conflict if replicated in > " > "parallel, to avoid unnecessary rollback and retry.", > SESSION_ONLY(option_bits), NO_CMD_LINE, OPTION_RPL_ALLOW_PARALLEL, > > _______________________________________________ > commits mailing list > [email protected] > https://lists.askmonty.org/cgi-bin/mailman/listinfo/commits >
_______________________________________________ Mailing list: https://launchpad.net/~maria-developers Post to : [email protected] Unsubscribe : https://launchpad.net/~maria-developers More help : https://help.launchpad.net/ListHelp

