On 1/9/24 23:49, Ilya Maximets wrote:
> Refactoring of the replication code, so each database is handled
> separately from each other. Supposed to work the same way as before
> with the only difference that each backup database will have its own
> connection to the source and will have its own state machine.
>
> From the user's perspective, the only visible difference is that
> ovsdb-server/sync-status appctl now shows the status of each
> database separately.
>
> If one of the connections is permanently broken, all the databases
> will be switched to active. This is done in order to preserve the
> old behavior where we had only one connection.
>
> Signed-off-by: Ilya Maximets <[email protected]>
> ---
Hi Ilya,
> ovsdb/ovsdb-server.c | 74 +++--
> ovsdb/replication.c | 676 ++++++++++++++++++++-----------------------
> ovsdb/replication.h | 36 +--
> 3 files changed, 384 insertions(+), 402 deletions(-)
>
> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
> index 7e95b3813..9a3b0add1 100644
> --- a/ovsdb/ovsdb-server.c
> +++ b/ovsdb/ovsdb-server.c
> @@ -166,12 +166,12 @@ ovsdb_replication_init(const char *sync_from, const
> char *exclude,
> struct shash *all_dbs, const struct uuid *server_uuid,
> int probe_interval)
> {
> - replication_init(sync_from, exclude, server_uuid, probe_interval);
> struct shash_node *node;
> SHASH_FOR_EACH (node, all_dbs) {
> struct db *db = node->data;
> if (node->name[0] != '_' && db->db) {
> - replication_add_local_db(node->name, db->db);
> + replication_set_db(db->db, sync_from, exclude,
> + server_uuid, probe_interval);
> }
> }
> }
> @@ -228,11 +228,20 @@ main_loop(struct server_config *config,
> report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
> ovsdb_jsonrpc_server_run(jsonrpc);
>
> + replication_run();
> if (*is_backup) {
> - replication_run();
> - if (!replication_is_alive()) {
> - disconnect_active_server();
> - *is_backup = false;
> + SHASH_FOR_EACH (node, all_dbs) {
> + struct db *db = node->data;
> + if (db->db->name[0] != '_' && !replication_is_alive(db->db))
> {
> + *is_backup = false;
> + break;
> + }
> + }
> + if (!*is_backup) {
> + SHASH_FOR_EACH (node, all_dbs) {
> + struct db *db = node->data;
> + replication_remove_db(db->db);
> + }
> }
> }
>
> @@ -283,10 +292,8 @@ main_loop(struct server_config *config,
> update_server_status(all_dbs);
>
> memory_wait();
> - if (*is_backup) {
> - replication_wait();
> - }
>
> + replication_wait();
> ovsdb_relay_wait();
>
> ovsdb_jsonrpc_server_wait(jsonrpc);
> @@ -518,7 +525,7 @@ main(int argc, char *argv[])
> &server_config);
> unixctl_command_register("ovsdb-server/get-sync-exclude-tables", "",
> 0, 0, ovsdb_server_get_sync_exclude_tables,
> - NULL);
> + &server_config);
> unixctl_command_register("ovsdb-server/sync-status", "",
> 0, 0, ovsdb_server_get_sync_status,
> &server_config);
> @@ -606,6 +613,9 @@ close_db(struct server_config *config, struct db *db,
> char *comment)
> if (db->db->is_relay) {
> ovsdb_relay_del_db(db->db);
> }
> + if (*config->is_backup) {
> + replication_remove_db(db->db);
> + }
> ovsdb_destroy(db->db);
> free(db->filename);
> free(db);
> @@ -1500,8 +1510,12 @@ ovsdb_server_disconnect_active_ovsdb_server(struct
> unixctl_conn *conn,
> void *config_)
> {
> struct server_config *config = config_;
> + struct shash_node *node;
>
> - disconnect_active_server();
> + SHASH_FOR_EACH (node, config->all_dbs) {
> + struct db *db = node->data;
> + replication_remove_db(db->db);
> + }
> *config->is_backup = false;
> save_config(config);
> unixctl_command_reply(conn, NULL);
> @@ -1520,7 +1534,11 @@
> ovsdb_server_set_active_ovsdb_server_probe_interval(struct unixctl_conn *conn,
> *config->replication_probe_interval = probe_interval;
> save_config(config);
> if (*config->is_backup) {
> - replication_set_probe_interval(probe_interval);
> + const struct uuid *server_uuid;
> + server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
> + ovsdb_replication_init(*config->sync_from, *config->sync_exclude,
> + config->all_dbs, server_uuid,
> + *config->replication_probe_interval);
> }
> unixctl_command_reply(conn, NULL);
> } else {
> @@ -1557,7 +1575,7 @@ ovsdb_server_set_sync_exclude_tables(struct
> unixctl_conn *conn,
> {
> struct server_config *config = config_;
>
> - char *err = set_excluded_tables(argv[1], true);
> + char *err = parse_excluded_tables(argv[1]);
> if (!err) {
> free(*config->sync_exclude);
> *config->sync_exclude = xstrdup(argv[1]);
> @@ -1569,7 +1587,6 @@ ovsdb_server_set_sync_exclude_tables(struct
> unixctl_conn *conn,
> config->all_dbs, server_uuid,
> *config->replication_probe_interval);
> }
> - err = set_excluded_tables(argv[1], false);
> }
> unixctl_command_reply(conn, err);
> free(err);
> @@ -1579,11 +1596,11 @@ static void
> ovsdb_server_get_sync_exclude_tables(struct unixctl_conn *conn,
> int argc OVS_UNUSED,
> const char *argv[] OVS_UNUSED,
> - void *arg_ OVS_UNUSED)
> + void *config_)
> {
> - char *reply = get_excluded_tables();
> - unixctl_command_reply(conn, reply);
> - free(reply);
> + struct server_config *config = config_;
> +
> + unixctl_command_reply(conn, *config->sync_exclude);
> }
>
> static void
> @@ -1842,13 +1859,6 @@ remove_db(struct server_config *config, struct
> shash_node *node, char *comment)
> shash_delete(config->all_dbs, node);
>
> save_config(config);
> - if (*config->is_backup) {
> - const struct uuid *server_uuid;
> - server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
> - ovsdb_replication_init(*config->sync_from, *config->sync_exclude,
> - config->all_dbs, server_uuid,
> - *config->replication_probe_interval);
> - }
> }
>
> static void
> @@ -1990,7 +2000,17 @@ ovsdb_server_get_sync_status(struct unixctl_conn
> *conn, int argc OVS_UNUSED,
> ds_put_format(&ds, "state: %s\n", is_backup ? "backup" : "active");
>
> if (is_backup) {
> - ds_put_and_free_cstr(&ds, replication_status());
> + const struct shash_node **db_nodes = shash_sort(config->all_dbs);
> +
> + for (size_t i = 0; i < shash_count(config->all_dbs); i++) {
> + const struct db *db = db_nodes[i]->data;
> +
> + if (db->db && db->db->name[0] != '_') {
> + ds_put_and_free_cstr(&ds, replication_status(db->db));
> + ds_put_char(&ds, '\n');
> + }
> + }
> + free(db_nodes);
> }
>
> unixctl_command_reply(conn, ds_cstr(&ds));
> @@ -2154,7 +2174,7 @@ parse_options(int argc, char *argv[],
> break;
>
> case OPT_SYNC_EXCLUDE: {
> - char *err = set_excluded_tables(optarg, false);
> + char *err = parse_excluded_tables(optarg);
> if (err) {
> ovs_fatal(0, "%s", err);
> }
> diff --git a/ovsdb/replication.c b/ovsdb/replication.c
> index 477c69d70..d0d48aad5 100644
> --- a/ovsdb/replication.c
> +++ b/ovsdb/replication.c
> @@ -38,16 +38,7 @@
>
> VLOG_DEFINE_THIS_MODULE(replication);
>
> -static char *sync_from;
> static struct uuid server_uuid;
> -static struct jsonrpc_session *session;
> -static unsigned int session_seqno = UINT_MAX;
> -
> -static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
> -static void add_monitored_table(struct ovsdb_table_schema *table,
> - struct json *monitor_requests);
> -
> -static struct ovsdb_error *reset_database(struct ovsdb *db);
>
> static struct ovsdb_error *process_notification(struct json *, struct ovsdb
> *);
> static struct ovsdb_error *process_table_update(struct json *table_update,
> @@ -55,27 +46,6 @@ static struct ovsdb_error *process_table_update(struct
> json *table_update,
> struct ovsdb *database,
> struct ovsdb_txn *txn);
>
> -/* Maps from db name to sset of table names. */
> -static struct shash excluded_tables = SHASH_INITIALIZER(&excluded_tables);
> -
> -static void excluded_tables_clear(void);
> -static void excluded_tables_add(const char *database, const char *table);
> -static bool excluded_tables_find(const char *database, const char *table);
> -
> -
> -/* Keep track of request IDs of all outstanding OVSDB requests. */
> -static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
> -
> -struct request_ids_hmap_node {
> - struct hmap_node hmap;
> - struct json *request_id;
> - struct ovsdb *db; /* associated database */
> -};
> -void request_ids_add(const struct json *id, struct ovsdb *db);
> -bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
> -static void request_ids_destroy(void);
> -void request_ids_clear(void);
> -
> enum ovsdb_replication_state {
> RPL_S_INIT,
> RPL_S_SERVER_ID_REQUESTED,
> @@ -85,168 +55,231 @@ enum ovsdb_replication_state {
> RPL_S_REPLICATING,
> RPL_S_ERR /* Error, no longer replicating. */
> };
> -static enum ovsdb_replication_state state;
>
> -
> struct replication_db {
> struct ovsdb *db;
> +
> bool schema_version_higher;
> /* Points to the schema received from the active server if
> * the local db schema version is higher. NULL otherwise. */
> struct ovsdb_schema *active_db_schema;
> +
> + char *sync_from;
> + char *excluded_tables_str;
> + struct sset excluded_tables;
> +
> + struct json *request_id; /* Id of the outstanding OVSDB request. */
> +
> + struct jsonrpc_session *session;
> + unsigned int session_seqno;
> +
> + enum ovsdb_replication_state state;
> };
>
> static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
> struct ovsdb_schema *active_db_schema);
>
> +static struct jsonrpc_msg *create_monitor_request(struct replication_db *,
> + struct ovsdb_schema *);
> +static void add_monitored_table(struct ovsdb_table_schema *table,
> + struct json *monitor_requests);
> +
> +
> /* All DBs known to ovsdb-server. The actual replication dbs are stored
> * in 'replication dbs', which is a subset of all dbs and remote dbs whose
> * schema matches. */
> -static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
> -static struct shash *replication_dbs;
> +static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
> +
> +static void replication_db_destroy(struct replication_db *);
> +static struct ovsdb_error *reset_database(struct replication_db *);
>
> -static struct shash *replication_dbs_create(void);
> -static void replication_dbs_destroy(void);
> /* Find 'struct ovsdb' by name within 'replication_dbs' */
> static struct replication_db *find_db(const char *db_name);
> +
> +static char *set_excluded_tables(struct replication_db *, const char
> *excluded)
> + OVS_WARN_UNUSED_RESULT;
> +
> +static void request_id_set(struct replication_db *, const struct json *id);
> +static void request_id_clear(struct replication_db *);
> +static bool request_id_compare_and_free(struct replication_db *,
> + const struct json *id);
>
>
> void
> -replication_init(const char *sync_from_, const char *exclude_tables,
> - const struct uuid *server, int probe_interval)
> +replication_set_db(struct ovsdb *db, const char *sync_from,
> + const char *exclude_tables, const struct uuid *server,
> + int probe_interval)
> {
> - free(sync_from);
> - sync_from = xstrdup(sync_from_);
> - /* Caller should have verified that the 'exclude_tables' is
> - * parseable. An error here is unexpected. */
> - ovs_assert(!set_excluded_tables(exclude_tables, false));
> + struct replication_db *rdb = find_db(db->name);
>
> - replication_dbs_destroy();
> + if (uuid_is_zero(&server_uuid)) {
> + /* Keep a copy of local server uuid. */
> + server_uuid = *server;
> + } else {
> + ovs_assert(uuid_equals(&server_uuid, server));
> + }
> +
> + ovs_assert(sync_from);
> +
> + if (rdb
> + && nullable_string_is_equal(rdb->excluded_tables_str, exclude_tables)
> + && nullable_string_is_equal(rdb->sync_from, sync_from)) {
> + jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
> + return;
> + }
>
> - shash_clear(&local_dbs);
> - if (session) {
> - jsonrpc_session_close(session);
> + if (!rdb) {
> + rdb = xzalloc(sizeof *rdb);
> + rdb->db = db;
> + sset_init(&rdb->excluded_tables);
> + rdb->schema_version_higher = false;
> + shash_add(&replication_dbs, db->name, rdb);
> + } else {
> + replication_db_destroy(rdb);
> }
>
> - session = jsonrpc_session_open(sync_from, true);
> - session_seqno = UINT_MAX;
> + rdb->sync_from = xstrdup(sync_from);
> + rdb->excluded_tables_str = nullable_xstrdup(exclude_tables);
> + /* Caller should have verified that the 'exclude_tables' is
> + * parseable. An error here is unexpected. */
> + ovs_assert(!set_excluded_tables(rdb, exclude_tables));
>
> - jsonrpc_session_set_probe_interval(session, probe_interval);
> + rdb->session = jsonrpc_session_open(rdb->sync_from, true);
> + rdb->session_seqno = UINT_MAX;
>
> - /* Keep a copy of local server uuid. */
> - server_uuid = *server;
> + jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
>
> - state = RPL_S_INIT;
> + rdb->state = RPL_S_INIT;
> }
>
> void
> -replication_add_local_db(const char *database, struct ovsdb *db)
> +replication_remove_db(const struct ovsdb *db)
> {
> - shash_add_assert(&local_dbs, database, db);
> + struct replication_db *rdb;
> +
> + rdb = shash_find_and_delete(&replication_dbs, db->name);
> + if (rdb) {
> + replication_db_destroy(rdb);
> + free(rdb);
> + }
> }
>
> -static void
> -send_schema_requests(const struct json *result)
> +static bool
> +json_array_contains_string(const struct json *js, const char *str)
Doesn't this fit better in json.[ch]?
> {
> - for (size_t i = 0; i < result->array.n; i++) {
> - const struct json *name = result->array.elems[i];
> - if (name->type == JSON_STRING) {
> - /* Send one schema request for each remote DB. */
> - const char *db_name = json_string(name);
> - struct replication_db *rdb = find_db(db_name);
> - if (rdb) {
> - struct jsonrpc_msg *request =
> - jsonrpc_create_request(
> - "get_schema",
> - json_array_create_1(
> - json_string_create(db_name)),
> - NULL);
> -
> - request_ids_add(request->id, rdb->db);
> - jsonrpc_session_send(session, request);
> - }
> + bool found = false;
> +
I guess this comment is more relevant if we move the function to
json.[ch] but should we ensure we don't use this incorrectly? E.g.:
ovs_assert(json->type == JSON_ARRAY);
Or without aborting and just returning early?
> + for (size_t i = 0; i < js->array.n; i++) {
> + const struct json *elem = js->array.elems[i];
> +
> + if (elem->type == JSON_STRING && !strcmp(json_string(elem), str)) {
> + found = true;
> + break;
> }
> }
> + return found;
> }
>
> -void
> -replication_run(void)
> +static void
> +send_schema_request(struct replication_db *rdb)
> +{
> + struct jsonrpc_msg *request =
> + jsonrpc_create_request(
> + "get_schema",
> + json_array_create_1(json_string_create(rdb->db->name)),
> + NULL);
> +
> + request_id_set(rdb, request->id);
> + jsonrpc_session_send(rdb->session, request);
> +}
> +
> +static void
> +replication_run_db(struct replication_db *rdb)
> {
> - if (!session) {
> + if (!rdb->session) {
> return;
> }
>
> - jsonrpc_session_run(session);
> + jsonrpc_session_run(rdb->session);
>
> - for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
> + for (int i = 0; i < 50; i++) {
> struct jsonrpc_msg *msg;
> unsigned int seqno;
>
> - seqno = jsonrpc_session_get_seqno(session);
> - if (seqno != session_seqno || state == RPL_S_INIT) {
> - session_seqno = seqno;
> - request_ids_clear();
> + if (!jsonrpc_session_is_connected(rdb->session)) {
> + break;
> + }
> +
> + seqno = jsonrpc_session_get_seqno(rdb->session);
> + if (seqno != rdb->session_seqno || rdb->state == RPL_S_INIT) {
> + rdb->session_seqno = seqno;
> + request_id_clear(rdb);
> +
> struct jsonrpc_msg *request;
> request = jsonrpc_create_request("get_server_id",
> json_array_create_empty(),
> NULL);
> - request_ids_add(request->id, NULL);
> - jsonrpc_session_send(session, request);
> + request_id_set(rdb, request->id);
> + jsonrpc_session_send(rdb->session, request);
>
> - state = RPL_S_SERVER_ID_REQUESTED;
> - VLOG_DBG("send server ID request.");
> + rdb->state = RPL_S_SERVER_ID_REQUESTED;
> + VLOG_DBG("%s: send server ID request.", rdb->db->name);
> }
>
> - msg = jsonrpc_session_recv(session);
> + msg = jsonrpc_session_recv(rdb->session);
> if (!msg) {
> continue;
> }
>
> - if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
> + if (msg->type == JSONRPC_NOTIFY && rdb->state != RPL_S_ERR
> && !strcmp(msg->method, "update")) {
> if (msg->params->type == JSON_ARRAY
> && msg->params->array.n == 2
> && msg->params->array.elems[0]->type == JSON_STRING) {
> char *db_name = msg->params->array.elems[0]->string;
> - struct replication_db *rdb = find_db(db_name);
> - if (rdb) {
> +
> + if (!strcmp(db_name, rdb->db->name)) {
> struct ovsdb_error *error;
> error = process_notification(msg->params->array.elems[1],
> rdb->db);
> if (error) {
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> }
> + } else {
> + VLOG_WARN("%s: received update for unexpected database
> %s",
> + rdb->db->name, db_name);
> + rdb->state = RPL_S_ERR;
> }
> }
> } else if (msg->type == JSONRPC_REPLY) {
> - struct replication_db *rdb;
> - struct ovsdb *db;
> - if (!request_ids_lookup_and_free(msg->id, &db)) {
> - VLOG_WARN("received unexpected reply");
> + if (!request_id_compare_and_free(rdb, msg->id)) {
> + VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
> goto next;
> }
>
> - switch (state) {
> + switch (rdb->state) {
> case RPL_S_SERVER_ID_REQUESTED: {
> struct uuid uuid;
> if (msg->result->type != JSON_STRING ||
> !uuid_from_string(&uuid, json_string(msg->result))) {
> struct ovsdb_error *error;
> error = ovsdb_error("get_server_id failed",
> - "Server ID is not valid UUID");
> + "%s: Server ID is not valid UUID",
> + rdb->db->name);
>
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> break;
> }
>
> if (uuid_equals(&uuid, &server_uuid)) {
> struct ovsdb_error *error;
> error = ovsdb_error("Server ID check failed",
> - "Self replicating is not allowed");
> + "%s: Self replicating is not
> allowed",
> + rdb->db->name);
>
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> break;
> }
>
> @@ -254,25 +287,32 @@ replication_run(void)
> request = jsonrpc_create_request("list_dbs",
> json_array_create_empty(),
> NULL);
> - request_ids_add(request->id, NULL);
> - jsonrpc_session_send(session, request);
> + request_id_set(rdb, request->id);
> + jsonrpc_session_send(rdb->session, request);
>
> - replication_dbs_destroy();
> - replication_dbs = replication_dbs_create();
> - state = RPL_S_DB_REQUESTED;
> + rdb->state = RPL_S_DB_REQUESTED;
> break;
> }
> case RPL_S_DB_REQUESTED:
> if (msg->result->type != JSON_ARRAY) {
> struct ovsdb_error *error;
> error = ovsdb_error("list_dbs failed",
> - "list_dbs response is not array");
> + "%s: list_dbs response is not array",
> + rdb->db->name);
> + ovsdb_error_assert(error);
> + rdb->state = RPL_S_ERR;
> + } else if (!json_array_contains_string(msg->result,
> + rdb->db->name)) {
> + struct ovsdb_error *error;
> + error = ovsdb_error("list_dbs failed",
> + "%s: database name is not in the
> list",
> + rdb->db->name);
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> } else {
> - send_schema_requests(msg->result);
> - VLOG_DBG("Send schema requests");
> - state = RPL_S_SCHEMA_REQUESTED;
> + send_schema_request(rdb);
> + VLOG_DBG("%s: send schema request.", rdb->db->name);
> + rdb->state = RPL_S_SCHEMA_REQUESTED;
> }
> break;
>
> @@ -283,19 +323,22 @@ replication_run(void)
> error = ovsdb_schema_from_json(msg->result, &schema);
> if (error) {
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> + break;
> }
>
> - rdb = find_db(schema->name);
> - if (!rdb) {
> + if (strcmp(rdb->db->name, schema->name)) {
> /* Unexpected schema. */
> - VLOG_WARN("unexpected schema %s", schema->name);
> - state = RPL_S_ERR;
> + VLOG_WARN("%s: unexpected schema %s.",
> + rdb->db->name, schema->name);
> + rdb->state = RPL_S_ERR;
> + ovsdb_schema_destroy(schema);
> + break;
> } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
> /* Schmea version mismatch. */
> - VLOG_INFO("Schema version mismatch, checking if %s can "
> - "still be replicated or not.",
> - schema->name);
> + VLOG_INFO("%s: Schema version mismatch, checking if %s
> can"
> + " still be replicated or not.",
> + rdb->db->name, schema->name);
> if (is_replication_possible(rdb->db->schema, schema)) {
> VLOG_INFO("%s can be replicated.", schema->name);
> rdb->schema_version_higher = true;
> @@ -305,68 +348,48 @@ replication_run(void)
> rdb->active_db_schema = schema;
> } else {
> VLOG_INFO("%s cannot be replicated.", schema->name);
> - struct replication_db *r =
> - shash_find_and_delete(replication_dbs,
> - schema->name);
> - if (r->active_db_schema) {
> - ovsdb_schema_destroy(r->active_db_schema);
> - }
> - free(r);
> + rdb->state = RPL_S_ERR;
> ovsdb_schema_destroy(schema);
> + break;
> }
> } else {
> ovsdb_schema_destroy(schema);
> }
>
> - /* After receiving schemas, reset the local databases that
> - * will be monitored and send out monitor requests for them.
> */
> - if (hmap_is_empty(&request_ids)) {
> - struct shash_node *node;
> -
> - if (shash_is_empty(replication_dbs)) {
> - VLOG_WARN("Nothing to replicate.");
> - state = RPL_S_ERR;
> - } else {
> - SHASH_FOR_EACH (node, replication_dbs) {
> - rdb = node->data;
> - struct jsonrpc_msg *request =
> - create_monitor_request(
> - rdb->schema_version_higher ?
> - rdb->active_db_schema : rdb->db->schema);
> -
> - request_ids_add(request->id, rdb->db);
> - jsonrpc_session_send(session, request);
> - VLOG_DBG("Send monitor requests");
> - state = RPL_S_MONITOR_REQUESTED;
> - }
> - }
> - }
> + /* Send out a monitor request. */
> + struct jsonrpc_msg *request =
> + create_monitor_request(rdb, rdb->schema_version_higher
> + ? rdb->active_db_schema
> + : rdb->db->schema);
> +
> + request_id_set(rdb, request->id);
> + jsonrpc_session_send(rdb->session, request);
> + VLOG_DBG("%s: send monitor request.", rdb->db->name);
> + rdb->state = RPL_S_MONITOR_REQUESTED;
> break;
> }
>
> case RPL_S_MONITOR_REQUESTED: {
> /* Reply to monitor requests. */
> struct ovsdb_error *error;
> - VLOG_INFO("Monitor request received. Resetting the
> database");
> + VLOG_INFO("%s: Monitor reply received. "
> + "Resetting the database.", rdb->db->name);
> /* Resetting the database here has few risks. If the
> * process_notification() fails, the database is completely
> * lost locally. In case that node becomes active, then
> * there is a chance of complete data loss in the
> active/standy
> * cluster. */
> - error = reset_database(db);
> + error = reset_database(rdb);
> if (!error) {
> - error = process_notification(msg->result, db);
> + error = process_notification(msg->result, rdb->db);
> }
> if (error) {
> ovsdb_error_assert(error);
> - state = RPL_S_ERR;
> + rdb->state = RPL_S_ERR;
> } else {
> - /* Transition to replicating state after receiving
> - * all replies of "monitor" requests. */
> - if (hmap_is_empty(&request_ids)) {
> - VLOG_DBG("Listening to monitor updates");
> - state = RPL_S_REPLICATING;
> - }
> + VLOG_DBG("%s: Listening to monitor updates.",
> + rdb->db->name);
> + rdb->state = RPL_S_REPLICATING;
> }
> break;
> }
> @@ -378,7 +401,7 @@ replication_run(void)
> case RPL_S_INIT:
> case RPL_S_REPLICATING:
> default:
> - OVS_NOT_REACHED();
> + VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
Why do we just warn instead of aborting? I assume the chance to get
here is as high as it was before the patch, right?
> }
> }
> next:
> @@ -386,24 +409,40 @@ replication_run(void)
> }
> }
>
> +void
> +replication_run(void)
> +{
> + struct shash_node *node;
> +
> + SHASH_FOR_EACH (node, &replication_dbs) {
> + replication_run_db(node->data);
> + }
> +}
> +
> void
> replication_wait(void)
> {
> - if (session) {
> - jsonrpc_session_wait(session);
> - jsonrpc_session_recv_wait(session);
> + struct shash_node *node;
> +
> + SHASH_FOR_EACH (node, &replication_dbs) {
> + struct replication_db *rdb = node->data;
> +
> + if (rdb->session) {
> + jsonrpc_session_wait(rdb->session);
> + jsonrpc_session_recv_wait(rdb->session);
> + }
> }
> }
>
> -/* Parse 'excluded' to rebuild 'excluded_tables'. If 'dryrun' is false, the
> - * current set of excluded tables will be wiped out, regardless of whether
> - * 'excluded' can be parsed. If 'dryrun' is true, only parses 'excluded' and
> +/* Parse 'excluded' to rebuild 'rdb->excluded_tables'. If 'rdb' is not NULL,
> + * the current set of excluded tables will be wiped out, regardless of
> whether
> + * 'excluded' can be parsed. If 'rdb' is NULL, only parses 'excluded' and
> * reports any errors, without modifying the list of exclusions.
> *
> - * On error, returns the error string, which the caller is
> - * responsible for freeing. Returns NULL otherwise. */
> -char * OVS_WARN_UNUSED_RESULT
> -set_excluded_tables(const char *excluded, bool dryrun)
> + * On error, returns the error string, which the caller is responsible for
> + * freeing. Returns NULL otherwise. */
> +static char * OVS_WARN_UNUSED_RESULT
> +set_excluded_tables__(struct replication_db *rdb, const char *excluded)
> {
> struct sset set = SSET_INITIALIZER(&set);
> char *err = NULL;
> @@ -411,17 +450,22 @@ set_excluded_tables(const char *excluded, bool dryrun)
> if (excluded) {
> const char *longname;
>
> - if (!dryrun) {
> - /* Can only add to an empty shash. */
> - excluded_tables_clear();
> + if (rdb) {
> + /* Can only add to an empty set. */
> + sset_clear(&rdb->excluded_tables);
> }
>
> sset_from_delimited_string(&set, excluded, " ,");
> SSET_FOR_EACH (longname, &set) {
> + if (rdb && !strchr(longname, ':')) {
> + sset_add(&rdb->excluded_tables, longname);
> + continue;
> + }
> +
> char *database = xstrdup(longname), *table = NULL;
> strtok_r(database, ":", &table);
> - if (table && !dryrun) {
> - excluded_tables_add(database, table);
> + if (table && rdb && !strcmp(rdb->db->name, database)) {
> + sset_add(&rdb->excluded_tables, table);
> }
>
> free(database);
> @@ -434,120 +478,74 @@ set_excluded_tables(const char *excluded, bool dryrun)
>
> done:
> sset_destroy(&set);
> - if (err && !dryrun) {
> + if (err && rdb) {
> /* On error, destroy the partially built 'excluded_tables'. */
> - excluded_tables_clear();
> + sset_clear(&rdb->excluded_tables);
> }
> return err;
> }
>
> char * OVS_WARN_UNUSED_RESULT
> -get_excluded_tables(void)
> +parse_excluded_tables(const char *excluded)
> {
> - struct shash_node *node;
> - struct sset set = SSET_INITIALIZER(&set);
> -
> - SHASH_FOR_EACH (node, &excluded_tables) {
> - const char *database = node->name;
> - const char *table;
> - struct sset *tables = node->data;
> -
> - SSET_FOR_EACH (table, tables) {
> - sset_add_and_free(&set, xasprintf("%s:%s", database, table));
> - }
> - }
> -
> - /* Output the table list in an sorted order, so that
> - * the output string will not depend on the hash function
> - * that used to implement the hmap data structure. This is
> - * only useful for writting unit tests. */
> - const char **sorted = sset_sort(&set);
> - struct ds ds = DS_EMPTY_INITIALIZER;
> - size_t i;
> - for (i = 0; i < sset_count(&set); i++) {
> - ds_put_format(&ds, "%s,", sorted[i]);
> - }
> -
> - ds_chomp(&ds, ',');
> -
> - free(sorted);
> - sset_destroy(&set);
> -
> - return ds_steal_cstr(&ds);
> + return set_excluded_tables__(NULL, excluded);
> }
>
> -static void
> -excluded_tables_clear(void)
> +static char * OVS_WARN_UNUSED_RESULT
> +set_excluded_tables(struct replication_db *rdb, const char *excluded)
> {
> - struct shash_node *node;
> - SHASH_FOR_EACH (node, &excluded_tables) {
> - struct sset *tables = node->data;
> - sset_destroy(tables);
> - }
> -
> - shash_clear_free_data(&excluded_tables);
> + return set_excluded_tables__(rdb, excluded);
> }
>
> -static void
> -excluded_tables_add(const char *database, const char *table)
> +char * OVS_WARN_UNUSED_RESULT
> +get_excluded_tables(const struct ovsdb *db)
> {
> - struct sset *tables = shash_find_data(&excluded_tables, database);
> + const struct replication_db *rdb = find_db(db->name);
>
> - if (!tables) {
> - tables = xmalloc(sizeof *tables);
> - sset_init(tables);
> - shash_add(&excluded_tables, database, tables);
> + if (!rdb) {
> + return xstrdup("");
> }
>
> - sset_add(tables, table);
> -}
> + struct sset set = SSET_INITIALIZER(&set);
> + const char *table;
> + char *result;
>
> -static bool
> -excluded_tables_find(const char *database, const char *table)
> -{
> - struct sset *tables = shash_find_data(&excluded_tables, database);
> - return tables && sset_contains(tables, table);
> -}
> + SSET_FOR_EACH (table, &rdb->excluded_tables) {
> + sset_add_and_free(&set, xasprintf("%s:%s", rdb->db->name, table));
> + }
>
> -void
> -disconnect_active_server(void)
> -{
> - jsonrpc_session_close(session);
> - session = NULL;
> + result = sset_join(&set, ",", "");
> + sset_destroy(&set);
> +
> + return result;
> }
>
> void
> replication_destroy(void)
> {
> - excluded_tables_clear();
> - shash_destroy(&excluded_tables);
> + struct shash_node *node;
>
> - if (sync_from) {
> - free(sync_from);
> - sync_from = NULL;
> + SHASH_FOR_EACH (node, &replication_dbs) {
> + replication_db_destroy(node->data);
> }
> -
> - request_ids_destroy();
> - replication_dbs_destroy();
> -
> - shash_destroy(&local_dbs);
> + shash_destroy_free_data(&replication_dbs);
> }
>
> static struct replication_db *
> find_db(const char *db_name)
> {
> - return shash_find_data(replication_dbs, db_name);
> + return shash_find_data(&replication_dbs, db_name);
> }
>
> static struct ovsdb_error *
> -reset_database(struct ovsdb *db)
> +reset_database(struct replication_db *rdb)
> {
> - struct ovsdb_txn *txn = ovsdb_txn_create(db);
> + struct ovsdb_txn *txn = ovsdb_txn_create(rdb->db);
> struct shash_node *table_node;
>
> - SHASH_FOR_EACH (table_node, &db->tables) {
> + SHASH_FOR_EACH (table_node, &rdb->db->tables) {
> /* Delete all rows if the table is not excluded. */
> - if (!excluded_tables_find(db->schema->name, table_node->name)) {
> + if (!sset_contains(&rdb->excluded_tables, table_node->name)) {
> struct ovsdb_table *table = table_node->data;
> struct ovsdb_row *row;
> HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) {
> @@ -565,7 +563,7 @@ reset_database(struct ovsdb *db)
> * Caller is responsible for disposing 'request'.
> */
> static struct jsonrpc_msg *
> -create_monitor_request(struct ovsdb_schema *schema)
> +create_monitor_request(struct replication_db *rdb, struct ovsdb_schema
> *schema)
> {
> struct jsonrpc_msg *request;
> struct json *monitor;
> @@ -579,7 +577,7 @@ create_monitor_request(struct ovsdb_schema *schema)
> struct ovsdb_table_schema *table = nodes[j]->data;
>
> /* Monitor all tables not excluded. */
> - if (!excluded_tables_find(db_name, table->name)) {
> + if (!sset_contains(&rdb->excluded_tables, table->name)) {
> add_monitored_table(table, monitor_request);
> }
> }
> @@ -689,114 +687,76 @@ process_table_update(struct json *table_update, const
> char *table_name,
> return NULL;
> }
>
> -void
> -request_ids_add(const struct json *id, struct ovsdb *db)
> +static void
> +request_id_set(struct replication_db *rdb, const struct json *id)
> {
> - struct request_ids_hmap_node *node = xmalloc(sizeof *node);
> + ovs_assert(!rdb->request_id);
> + rdb->request_id = json_clone(id);
> +}
>
> - node->request_id = json_clone(id);
> - node->db = db;
> - hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
> +static void
> +request_id_clear(struct replication_db *rdb)
> +{
> + json_destroy(rdb->request_id);
> + rdb->request_id = NULL;
> }
>
> -/* Look up 'id' from 'request_ids', if found, remove the found id from
> - * 'request_ids' and free its memory. If not found, 'request_ids' does
> - * not change. Sets '*db' to the database for the request (NULL if not
> - * found).
> +/* Compare 'id' with sent 'request_id'. If it mtches, clear the current
Typo: mtches
> + * 'request_id'. If it doesn't match, 'request_id' does not change.
> *
> - * Return true if 'id' is found, false otherwise.
> + * Return true if 'id' matches, false otherwise.
> */
> -bool
> -request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
> +static bool
> +request_id_compare_and_free(struct replication_db *rdb, const struct json
> *id)
> {
> - struct request_ids_hmap_node *node;
> -
> - HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
> - if (json_equal(id, node->request_id)) {
> - hmap_remove(&request_ids, &node->hmap);
> - *db = node->db;
> - json_destroy(node->request_id);
> - free(node);
> - return true;
> - }
> + if (rdb->request_id && json_equal(id, rdb->request_id)) {
> + request_id_clear(rdb);
> + return true;
> }
> -
> - *db = NULL;
> return false;
> }
>
> static void
> -request_ids_destroy(void)
> +replication_db_destroy(struct replication_db *rdb)
> {
> - struct request_ids_hmap_node *node;
> -
> - HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
> - json_destroy(node->request_id);
> - free(node);
> + if (!rdb) {
> + return;
> }
> - hmap_destroy(&request_ids);
> -}
>
> -void
> -request_ids_clear(void)
> -{
> - request_ids_destroy();
> - hmap_init(&request_ids);
> -}
> + free(rdb->sync_from);
> + rdb->sync_from = NULL;
>
> -static struct shash *
> -replication_dbs_create(void)
> -{
> - struct shash *new = xmalloc(sizeof *new);
> - shash_init(new);
> + free(rdb->excluded_tables_str);
> + rdb->excluded_tables_str = NULL;
> + sset_destroy(&rdb->excluded_tables);
>
> - struct shash_node *node;
> - SHASH_FOR_EACH (node, &local_dbs) {
> - struct replication_db *repl_db = xmalloc(sizeof *repl_db);
> - repl_db->db = node->data;
> - repl_db->schema_version_higher = false;
> - repl_db->active_db_schema = NULL;
> - shash_add(new, node->name, repl_db);
> - }
> + request_id_clear(rdb);
>
> - return new;
> -}
> -
> -static void
> -replication_dbs_destroy(void)
> -{
> - if (!replication_dbs) {
> - return;
> + if (rdb->session) {
> + jsonrpc_session_close(rdb->session);
> + rdb->session = NULL;
> }
>
> - struct shash_node *node;
> -
> - SHASH_FOR_EACH_SAFE (node, replication_dbs) {
> - hmap_remove(&replication_dbs->map, &node->node);
> - struct replication_db *rdb = node->data;
> - if (rdb->active_db_schema) {
> - ovsdb_schema_destroy(rdb->active_db_schema);
> - }
> - free(rdb);
> - free(node->name);
> - free(node);
> + if (rdb->active_db_schema) {
> + ovsdb_schema_destroy(rdb->active_db_schema);
> + rdb->active_db_schema = NULL;
> }
>
> - hmap_destroy(&replication_dbs->map);
> - free(replication_dbs);
> - replication_dbs = NULL;
> + rdb->schema_version_higher = false;
> }
>
> /* Return true if replication just started or is ongoing.
> * Return false if the connection failed, or the replication
> * was not able to start. */
> bool
> -replication_is_alive(void)
> +replication_is_alive(const struct ovsdb *db)
> {
> - if (session) {
> - return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
> + const struct replication_db *rdb = find_db(db->name);
> +
> + if (!rdb || !rdb->session) {
> + return false;
> }
> - return false;
> + return jsonrpc_session_is_alive(rdb->session) && rdb->state != RPL_S_ERR;
> }
>
> /* Return the last error reported on a connection by 'session'. The
> @@ -806,60 +766,60 @@ replication_is_alive(void)
> * Return a negative value if replication session has error, or the
> * replication was not able to start. */
> int
> -replication_get_last_error(void)
> +replication_get_last_error(const struct ovsdb *db)
> {
> + const struct replication_db *rdb = find_db(db->name);
> int err = 0;
>
> - if (session) {
> - err = jsonrpc_session_get_last_error(session);
> + if (rdb && rdb->session) {
> + err = jsonrpc_session_get_last_error(rdb->session);
> if (!err) {
> - err = (state == RPL_S_ERR) ? ENOENT : 0;
> + err = (rdb->state == RPL_S_ERR) ? ENOENT : 0;
> }
> }
>
> return err;
> }
>
> -char *
> -replication_status(void)
> +char * OVS_WARN_UNUSED_RESULT
> +replication_status(const struct ovsdb *db)
> {
> - bool alive = session && jsonrpc_session_is_alive(session);
> + const struct replication_db *rdb = find_db(db->name);
> +
> + if (!rdb) {
> + return xasprintf("%s is not configured for replication", db->name);
> + }
> +
> + bool alive = rdb->session && jsonrpc_session_is_alive(rdb->session);
> struct ds ds = DS_EMPTY_INITIALIZER;
>
> + ds_put_format(&ds, "database: %s\n", db->name);
> if (alive) {
> - switch(state) {
> + switch (rdb->state) {
> case RPL_S_INIT:
> case RPL_S_SERVER_ID_REQUESTED:
> case RPL_S_DB_REQUESTED:
> case RPL_S_SCHEMA_REQUESTED:
> case RPL_S_MONITOR_REQUESTED:
> - ds_put_format(&ds, "connecting: %s", sync_from);
> + ds_put_format(&ds, "connecting: %s", rdb->sync_from);
> break;
> case RPL_S_REPLICATING: {
> - struct shash_node *node;
> -
> - ds_put_format(&ds, "replicating: %s\n", sync_from);
> - ds_put_cstr(&ds, "database:");
> - SHASH_FOR_EACH (node, replication_dbs) {
> - ds_put_format(&ds, " %s,", node->name);
> - }
> - ds_chomp(&ds, ',');
> + ds_put_format(&ds, "replicating: %s\n", rdb->sync_from);
>
> - if (!shash_is_empty(&excluded_tables)) {
> - ds_put_char(&ds, '\n');
> + if (!sset_is_empty(&rdb->excluded_tables)) {
> ds_put_cstr(&ds, "exclude: ");
> - ds_put_and_free_cstr(&ds, get_excluded_tables());
> + ds_put_and_free_cstr(&ds, get_excluded_tables(db));
> }
> break;
> }
> case RPL_S_ERR:
> - ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
> + ds_put_format(&ds, "Replication to (%s) failed", rdb->sync_from);
> break;
> default:
> OVS_NOT_REACHED();
> }
> } else {
> - ds_put_format(&ds, "not connected to %s", sync_from);
> + ds_put_format(&ds, "not connected to %s", rdb->sync_from);
> }
> return ds_steal_cstr(&ds);
> }
> @@ -913,10 +873,12 @@ is_replication_possible(struct ovsdb_schema
> *local_db_schema,
> }
>
> void
> -replication_set_probe_interval(int probe_interval)
> +replication_set_probe_interval(const struct ovsdb *db, int probe_interval)
> {
> - if (session) {
> - jsonrpc_session_set_probe_interval(session, probe_interval);
> + const struct replication_db *rdb = find_db(db->name);
> +
> + if (rdb && rdb->session) {
> + jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
> }
> }
>
> diff --git a/ovsdb/replication.h b/ovsdb/replication.h
> index 6d1be820f..f5e226753 100644
> --- a/ovsdb/replication.h
> +++ b/ovsdb/replication.h
> @@ -26,41 +26,41 @@ struct ovsdb;
> * API Usage
> *===========
> *
> - * - replication_init() needs to be called whenever OVSDB server switches
> into
> + * - replication_set_db() needs to be called whenever database switches into
> * the backup mode.
> *
> - * - replication_add_local_db() should be called immediately after to add all
> - * known database that OVSDB server owns, one at a time.
> + * - replication_remove_db() needs to be called whenever backup database
> + * switches into an active mode.
> *
> * - replication_destroy() should be called when OVSDB server shutdown to
> * reclaim resources.
> *
> * - replication_run(), replication_wait(), replication_is_alive() and
> * replication_get_last_error() should be call within the main loop
> - * whenever OVSDB server runs in the backup mode.
> + * whenever OVSDB has backup databases.
> *
> - * - set_excluded_tables(), get_excluded_tables(), disconnect_active_server()
> - * and replication_usage() are support functions used mainly by unixctl
> - * commands.
> + * - parse_excluded_tables(), get_excluded_tables() and replication_usage()
> + * are support functions used mainly by unixctl commands.
> */
>
> #define REPLICATION_DEFAULT_PROBE_INTERVAL 60000
>
> -void replication_init(const char *sync_from, const char *exclude_tables,
> - const struct uuid *server, int probe_interval);
> +void replication_set_db(struct ovsdb *, const char *sync_from,
> + const char *exclude_tables, const struct uuid
> *server,
> + int probe_interval);
> +void replication_remove_db(const struct ovsdb *);
> +
> void replication_run(void);
> void replication_wait(void);
> void replication_destroy(void);
> void replication_usage(void);
> -void replication_add_local_db(const char *databse, struct ovsdb *db);
> -bool replication_is_alive(void);
> -int replication_get_last_error(void);
> -char *replication_status(void);
> -void replication_set_probe_interval(int);
> +bool replication_is_alive(const struct ovsdb *);
> +int replication_get_last_error(const struct ovsdb *);
> +char *replication_status(const struct ovsdb *);
> +void replication_set_probe_interval(const struct ovsdb *, int
> probe_interval);
>
> -char *set_excluded_tables(const char *excluded, bool dryrun)
> - OVS_WARN_UNUSED_RESULT;
> -char *get_excluded_tables(void) OVS_WARN_UNUSED_RESULT;
> -void disconnect_active_server(void);
> +char *parse_excluded_tables(const char *excluded) OVS_WARN_UNUSED_RESULT;
> +char *get_excluded_tables(const struct ovsdb *) OVS_WARN_UNUSED_RESULT;
> +void disconnect_active_server(const struct ovsdb *);
This should be removed too.
Regards,
Dumitru
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev