On 1/15/24 13:45, Ilya Maximets wrote:
> On 1/12/24 16:17, Dumitru Ceara wrote:
>> On 1/9/24 23:49, Ilya Maximets wrote:
>>> Refactoring of the replication code, so each database is handled
>>> separately from each other.  Supposed to work the same way as before
>>> with the only difference that each backup database will have its own
>>> connection to the source and will have its own state machine.
>>>
>>> From the user's perspective, the only visible difference is that
>>> ovsdb-server/sync-status appctl now shows the status of each
>>> database separately.
>>>
>>> If one of the connections is permanently broken, all the databases
>>> will be switched to active.  This is done in order to preserve the
>>> old behavior where we had only one connection.
>>>
>>> Signed-off-by: Ilya Maximets <[email protected]>
>>> ---
>>
>> Hi Ilya,
>>
>>>  ovsdb/ovsdb-server.c |  74 +++--
>>>  ovsdb/replication.c  | 676 ++++++++++++++++++++-----------------------
>>>  ovsdb/replication.h  |  36 +--
>>>  3 files changed, 384 insertions(+), 402 deletions(-)
>>>
>>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>>> index 7e95b3813..9a3b0add1 100644
>>> --- a/ovsdb/ovsdb-server.c
>>> +++ b/ovsdb/ovsdb-server.c
>>> @@ -166,12 +166,12 @@ ovsdb_replication_init(const char *sync_from, const 
>>> char *exclude,
>>>                         struct shash *all_dbs, const struct uuid 
>>> *server_uuid,
>>>                         int probe_interval)
>>>  {
>>> -    replication_init(sync_from, exclude, server_uuid, probe_interval);
>>>      struct shash_node *node;
>>>      SHASH_FOR_EACH (node, all_dbs) {
>>>          struct db *db = node->data;
>>>          if (node->name[0] != '_' && db->db) {
>>> -            replication_add_local_db(node->name, db->db);
>>> +            replication_set_db(db->db, sync_from, exclude,
>>> +                               server_uuid, probe_interval);
>>>          }
>>>      }
>>>  }
>>> @@ -228,11 +228,20 @@ main_loop(struct server_config *config,
>>>          report_error_if_changed(reconfigure_ssl(all_dbs), &ssl_error);
>>>          ovsdb_jsonrpc_server_run(jsonrpc);
>>>  
>>> +        replication_run();
>>>          if (*is_backup) {
>>> -            replication_run();
>>> -            if (!replication_is_alive()) {
>>> -                disconnect_active_server();
>>> -                *is_backup = false;
>>> +            SHASH_FOR_EACH (node, all_dbs) {
>>> +                struct db *db = node->data;
>>> +                if (db->db->name[0] != '_' && 
>>> !replication_is_alive(db->db)) {
>>> +                    *is_backup = false;
>>> +                    break;
>>> +                }
>>> +            }
>>> +            if (!*is_backup) {
>>> +                SHASH_FOR_EACH (node, all_dbs) {
>>> +                    struct db *db = node->data;
>>> +                    replication_remove_db(db->db);
>>> +                }
>>>              }
>>>          }
>>>  
>>> @@ -283,10 +292,8 @@ main_loop(struct server_config *config,
>>>          update_server_status(all_dbs);
>>>  
>>>          memory_wait();
>>> -        if (*is_backup) {
>>> -            replication_wait();
>>> -        }
>>>  
>>> +        replication_wait();
>>>          ovsdb_relay_wait();
>>>  
>>>          ovsdb_jsonrpc_server_wait(jsonrpc);
>>> @@ -518,7 +525,7 @@ main(int argc, char *argv[])
>>>                               &server_config);
>>>      unixctl_command_register("ovsdb-server/get-sync-exclude-tables", "",
>>>                               0, 0, ovsdb_server_get_sync_exclude_tables,
>>> -                             NULL);
>>> +                             &server_config);
>>>      unixctl_command_register("ovsdb-server/sync-status", "",
>>>                               0, 0, ovsdb_server_get_sync_status,
>>>                               &server_config);
>>> @@ -606,6 +613,9 @@ close_db(struct server_config *config, struct db *db, 
>>> char *comment)
>>>          if (db->db->is_relay) {
>>>              ovsdb_relay_del_db(db->db);
>>>          }
>>> +        if (*config->is_backup) {
>>> +            replication_remove_db(db->db);
>>> +        }
>>>          ovsdb_destroy(db->db);
>>>          free(db->filename);
>>>          free(db);
>>> @@ -1500,8 +1510,12 @@ ovsdb_server_disconnect_active_ovsdb_server(struct 
>>> unixctl_conn *conn,
>>>                                              void *config_)
>>>  {
>>>      struct server_config *config = config_;
>>> +    struct shash_node *node;
>>>  
>>> -    disconnect_active_server();
>>> +    SHASH_FOR_EACH (node, config->all_dbs) {
>>> +        struct db *db = node->data;
>>> +        replication_remove_db(db->db);
>>> +    }
>>>      *config->is_backup = false;
>>>      save_config(config);
>>>      unixctl_command_reply(conn, NULL);
>>> @@ -1520,7 +1534,11 @@ 
>>> ovsdb_server_set_active_ovsdb_server_probe_interval(struct unixctl_conn 
>>> *conn,
>>>          *config->replication_probe_interval = probe_interval;
>>>          save_config(config);
>>>          if (*config->is_backup) {
>>> -            replication_set_probe_interval(probe_interval);
>>> +            const struct uuid *server_uuid;
>>> +            server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
>>> +            ovsdb_replication_init(*config->sync_from, 
>>> *config->sync_exclude,
>>> +                                   config->all_dbs, server_uuid,
>>> +                                   *config->replication_probe_interval);
>>>          }
>>>          unixctl_command_reply(conn, NULL);
>>>      } else {
>>> @@ -1557,7 +1575,7 @@ ovsdb_server_set_sync_exclude_tables(struct 
>>> unixctl_conn *conn,
>>>  {
>>>      struct server_config *config = config_;
>>>  
>>> -    char *err = set_excluded_tables(argv[1], true);
>>> +    char *err = parse_excluded_tables(argv[1]);
>>>      if (!err) {
>>>          free(*config->sync_exclude);
>>>          *config->sync_exclude = xstrdup(argv[1]);
>>> @@ -1569,7 +1587,6 @@ ovsdb_server_set_sync_exclude_tables(struct 
>>> unixctl_conn *conn,
>>>                                     config->all_dbs, server_uuid,
>>>                                     *config->replication_probe_interval);
>>>          }
>>> -        err = set_excluded_tables(argv[1], false);
>>>      }
>>>      unixctl_command_reply(conn, err);
>>>      free(err);
>>> @@ -1579,11 +1596,11 @@ static void
>>>  ovsdb_server_get_sync_exclude_tables(struct unixctl_conn *conn,
>>>                                       int argc OVS_UNUSED,
>>>                                       const char *argv[] OVS_UNUSED,
>>> -                                     void *arg_ OVS_UNUSED)
>>> +                                     void *config_)
>>>  {
>>> -    char *reply = get_excluded_tables();
>>> -    unixctl_command_reply(conn, reply);
>>> -    free(reply);
>>> +    struct server_config *config = config_;
>>> +
>>> +    unixctl_command_reply(conn, *config->sync_exclude);
>>>  }
>>>  
>>>  static void
>>> @@ -1842,13 +1859,6 @@ remove_db(struct server_config *config, struct 
>>> shash_node *node, char *comment)
>>>      shash_delete(config->all_dbs, node);
>>>  
>>>      save_config(config);
>>> -    if (*config->is_backup) {
>>> -        const struct uuid *server_uuid;
>>> -        server_uuid = ovsdb_jsonrpc_server_get_uuid(config->jsonrpc);
>>> -        ovsdb_replication_init(*config->sync_from, *config->sync_exclude,
>>> -                               config->all_dbs, server_uuid,
>>> -                               *config->replication_probe_interval);
>>> -    }
>>>  }
>>>  
>>>  static void
>>> @@ -1990,7 +2000,17 @@ ovsdb_server_get_sync_status(struct unixctl_conn 
>>> *conn, int argc OVS_UNUSED,
>>>      ds_put_format(&ds, "state: %s\n", is_backup ? "backup" : "active");
>>>  
>>>      if (is_backup) {
>>> -        ds_put_and_free_cstr(&ds, replication_status());
>>> +        const struct shash_node **db_nodes = shash_sort(config->all_dbs);
>>> +
>>> +        for (size_t i = 0; i < shash_count(config->all_dbs); i++) {
>>> +            const struct db *db = db_nodes[i]->data;
>>> +
>>> +            if (db->db && db->db->name[0] != '_') {
>>> +                ds_put_and_free_cstr(&ds, replication_status(db->db));
>>> +                ds_put_char(&ds, '\n');
>>> +            }
>>> +        }
>>> +        free(db_nodes);
>>>      }
>>>  
>>>      unixctl_command_reply(conn, ds_cstr(&ds));
>>> @@ -2154,7 +2174,7 @@ parse_options(int argc, char *argv[],
>>>              break;
>>>  
>>>          case OPT_SYNC_EXCLUDE: {
>>> -            char *err = set_excluded_tables(optarg, false);
>>> +            char *err = parse_excluded_tables(optarg);
>>>              if (err) {
>>>                  ovs_fatal(0, "%s", err);
>>>              }
>>> diff --git a/ovsdb/replication.c b/ovsdb/replication.c
>>> index 477c69d70..d0d48aad5 100644
>>> --- a/ovsdb/replication.c
>>> +++ b/ovsdb/replication.c
>>> @@ -38,16 +38,7 @@
>>>  
>>>  VLOG_DEFINE_THIS_MODULE(replication);
>>>  
>>> -static char *sync_from;
>>>  static struct uuid server_uuid;
>>> -static struct jsonrpc_session *session;
>>> -static unsigned int session_seqno = UINT_MAX;
>>> -
>>> -static struct jsonrpc_msg *create_monitor_request(struct ovsdb_schema *);
>>> -static void add_monitored_table(struct ovsdb_table_schema *table,
>>> -                                struct json *monitor_requests);
>>> -
>>> -static struct ovsdb_error *reset_database(struct ovsdb *db);
>>>  
>>>  static struct ovsdb_error *process_notification(struct json *, struct 
>>> ovsdb *);
>>>  static struct ovsdb_error *process_table_update(struct json *table_update,
>>> @@ -55,27 +46,6 @@ static struct ovsdb_error *process_table_update(struct 
>>> json *table_update,
>>>                                                  struct ovsdb *database,
>>>                                                  struct ovsdb_txn *txn);
>>>  
>>> -/* Maps from db name to sset of table names. */
>>> -static struct shash excluded_tables = SHASH_INITIALIZER(&excluded_tables);
>>> -
>>> -static void excluded_tables_clear(void);
>>> -static void excluded_tables_add(const char *database, const char *table);
>>> -static bool excluded_tables_find(const char *database, const char *table);
>>> -
>>> -
>>> -/* Keep track of request IDs of all outstanding OVSDB requests. */
>>> -static struct hmap request_ids = HMAP_INITIALIZER(&request_ids);
>>> -
>>> -struct request_ids_hmap_node {
>>> -    struct hmap_node hmap;
>>> -    struct json *request_id;
>>> -    struct ovsdb *db;          /* associated database */
>>> -};
>>> -void request_ids_add(const struct json *id, struct ovsdb *db);
>>> -bool request_ids_lookup_and_free(const struct json *id, struct ovsdb **db);
>>> -static void request_ids_destroy(void);
>>> -void request_ids_clear(void);
>>> -
>>>  enum ovsdb_replication_state {
>>>      RPL_S_INIT,
>>>      RPL_S_SERVER_ID_REQUESTED,
>>> @@ -85,168 +55,231 @@ enum ovsdb_replication_state {
>>>      RPL_S_REPLICATING,
>>>      RPL_S_ERR /* Error, no longer replicating. */
>>>  };
>>> -static enum ovsdb_replication_state state;
>>>  
>>> -
>>>  struct replication_db {
>>>      struct ovsdb *db;
>>> +
>>>      bool schema_version_higher;
>>>       /* Points to the schema received from the active server if
>>>        * the local db schema version is higher. NULL otherwise. */
>>>      struct ovsdb_schema *active_db_schema;
>>> +
>>> +    char *sync_from;
>>> +    char *excluded_tables_str;
>>> +    struct sset excluded_tables;
>>> +
>>> +    struct json *request_id;  /* Id of the outstanding OVSDB request. */
>>> +
>>> +    struct jsonrpc_session *session;
>>> +    unsigned int session_seqno;
>>> +
>>> +    enum ovsdb_replication_state state;
>>>  };
>>>  
>>>  static bool is_replication_possible(struct ovsdb_schema *local_db_schema,
>>>                                      struct ovsdb_schema *active_db_schema);
>>>  
>>> +static struct jsonrpc_msg *create_monitor_request(struct replication_db *,
>>> +                                                  struct ovsdb_schema *);
>>> +static void add_monitored_table(struct ovsdb_table_schema *table,
>>> +                                struct json *monitor_requests);
>>> +
>>> +
>>>  /* All DBs known to ovsdb-server.  The actual replication dbs are stored
>>>   * in 'replication dbs', which is a subset of all dbs and remote dbs whose
>>>   * schema matches.  */
>>> -static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs);
>>> -static struct shash *replication_dbs;
>>> +static struct shash replication_dbs = SHASH_INITIALIZER(&replication_dbs);
>>> +
>>> +static void replication_db_destroy(struct replication_db *);
>>> +static struct ovsdb_error *reset_database(struct replication_db *);
>>>  
>>> -static struct shash *replication_dbs_create(void);
>>> -static void replication_dbs_destroy(void);
>>>  /* Find 'struct ovsdb' by name within 'replication_dbs' */
>>>  static struct replication_db *find_db(const char *db_name);
>>> +
>>> +static char *set_excluded_tables(struct replication_db *, const char 
>>> *excluded)
>>> +    OVS_WARN_UNUSED_RESULT;
>>> +
>>> +static void request_id_set(struct replication_db *, const struct json *id);
>>> +static void request_id_clear(struct replication_db *);
>>> +static bool request_id_compare_and_free(struct replication_db *,
>>> +                                        const struct json *id);
>>>  
>>>  
>>>  void
>>> -replication_init(const char *sync_from_, const char *exclude_tables,
>>> -                 const struct uuid *server, int probe_interval)
>>> +replication_set_db(struct ovsdb *db, const char *sync_from,
>>> +                   const char *exclude_tables, const struct uuid *server,
>>> +                   int probe_interval)
>>>  {
>>> -    free(sync_from);
>>> -    sync_from = xstrdup(sync_from_);
>>> -    /* Caller should have verified that the 'exclude_tables' is
>>> -     * parseable. An error here is unexpected. */
>>> -    ovs_assert(!set_excluded_tables(exclude_tables, false));
>>> +    struct replication_db *rdb = find_db(db->name);
>>>  
>>> -    replication_dbs_destroy();
>>> +    if (uuid_is_zero(&server_uuid)) {
>>> +        /* Keep a copy of local server uuid.  */
>>> +        server_uuid = *server;
>>> +    } else {
>>> +        ovs_assert(uuid_equals(&server_uuid, server));
>>> +    }
>>> +
>>> +    ovs_assert(sync_from);
>>> +
>>> +    if (rdb
>>> +        && nullable_string_is_equal(rdb->excluded_tables_str, 
>>> exclude_tables)
>>> +        && nullable_string_is_equal(rdb->sync_from, sync_from)) {
>>> +        jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
>>> +        return;
>>> +    }
>>>  
>>> -    shash_clear(&local_dbs);
>>> -    if (session) {
>>> -        jsonrpc_session_close(session);
>>> +    if (!rdb) {
>>> +        rdb = xzalloc(sizeof *rdb);
>>> +        rdb->db = db;
>>> +        sset_init(&rdb->excluded_tables);
>>> +        rdb->schema_version_higher = false;
>>> +        shash_add(&replication_dbs, db->name, rdb);
>>> +    } else {
>>> +        replication_db_destroy(rdb);
>>>      }
>>>  
>>> -    session = jsonrpc_session_open(sync_from, true);
>>> -    session_seqno = UINT_MAX;
>>> +    rdb->sync_from = xstrdup(sync_from);
>>> +    rdb->excluded_tables_str = nullable_xstrdup(exclude_tables);
>>> +    /* Caller should have verified that the 'exclude_tables' is
>>> +     * parseable. An error here is unexpected. */
>>> +    ovs_assert(!set_excluded_tables(rdb, exclude_tables));
>>>  
>>> -    jsonrpc_session_set_probe_interval(session, probe_interval);
>>> +    rdb->session = jsonrpc_session_open(rdb->sync_from, true);
>>> +    rdb->session_seqno = UINT_MAX;
>>>  
>>> -    /* Keep a copy of local server uuid.  */
>>> -    server_uuid = *server;
>>> +    jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
>>>  
>>> -    state = RPL_S_INIT;
>>> +    rdb->state = RPL_S_INIT;
>>>  }
>>>  
>>>  void
>>> -replication_add_local_db(const char *database, struct ovsdb *db)
>>> +replication_remove_db(const struct ovsdb *db)
>>>  {
>>> -    shash_add_assert(&local_dbs, database, db);
>>> +    struct replication_db *rdb;
>>> +
>>> +    rdb = shash_find_and_delete(&replication_dbs, db->name);
>>> +    if (rdb) {
>>> +        replication_db_destroy(rdb);
>>> +        free(rdb);
>>> +    }
>>>  }
>>>  
>>> -static void
>>> -send_schema_requests(const struct json *result)
>>> +static bool
>>> +json_array_contains_string(const struct json *js, const char *str)
>>
>> Doesn't this fit better in json.[ch]?
> 
> I can move it, sure.
> 
>>
>>>  {
>>> -    for (size_t i = 0; i < result->array.n; i++) {
>>> -        const struct json *name = result->array.elems[i];
>>> -        if (name->type == JSON_STRING) {
>>> -            /* Send one schema request for each remote DB. */
>>> -            const char *db_name = json_string(name);
>>> -            struct replication_db *rdb = find_db(db_name);
>>> -            if (rdb) {
>>> -                struct jsonrpc_msg *request =
>>> -                    jsonrpc_create_request(
>>> -                        "get_schema",
>>> -                        json_array_create_1(
>>> -                            json_string_create(db_name)),
>>> -                        NULL);
>>> -
>>> -                request_ids_add(request->id, rdb->db);
>>> -                jsonrpc_session_send(session, request);
>>> -            }
>>> +    bool found = false;
>>> +
>>
>> I guess this comment is more relevant if we move the function to
>> json.[ch] but should we ensure we don't use this incorrectly?  E.g.:
>>
>> ovs_assert(json->type == JSON_ARRAY);
>>
>> Or without aborting and just returning early?
> 
> An assertion shold be fine, because json_array_* functions
> should not be called for non-arrays, i.e. caller should
> have checked.
> 
>>
>>> +    for (size_t i = 0; i < js->array.n; i++) {
>>> +        const struct json *elem = js->array.elems[i];
>>> +
>>> +        if (elem->type == JSON_STRING && !strcmp(json_string(elem), str)) {
>>> +            found = true;
>>> +            break;
>>>          }
>>>      }
>>> +    return found;
>>>  }
>>>  
>>> -void
>>> -replication_run(void)
>>> +static void
>>> +send_schema_request(struct replication_db *rdb)
>>> +{
>>> +    struct jsonrpc_msg *request =
>>> +        jsonrpc_create_request(
>>> +                "get_schema",
>>> +                json_array_create_1(json_string_create(rdb->db->name)),
>>> +                NULL);
>>> +
>>> +    request_id_set(rdb, request->id);
>>> +    jsonrpc_session_send(rdb->session, request);
>>> +}
>>> +
>>> +static void
>>> +replication_run_db(struct replication_db *rdb)
>>>  {
>>> -    if (!session) {
>>> +    if (!rdb->session) {
>>>          return;
>>>      }
>>>  
>>> -    jsonrpc_session_run(session);
>>> +    jsonrpc_session_run(rdb->session);
>>>  
>>> -    for (int i = 0; jsonrpc_session_is_connected(session) && i < 50; i++) {
>>> +    for (int i = 0; i < 50; i++) {
>>>          struct jsonrpc_msg *msg;
>>>          unsigned int seqno;
>>>  
>>> -        seqno = jsonrpc_session_get_seqno(session);
>>> -        if (seqno != session_seqno || state == RPL_S_INIT) {
>>> -            session_seqno = seqno;
>>> -            request_ids_clear();
>>> +        if (!jsonrpc_session_is_connected(rdb->session)) {
>>> +            break;
>>> +        }
>>> +
>>> +        seqno = jsonrpc_session_get_seqno(rdb->session);
>>> +        if (seqno != rdb->session_seqno || rdb->state == RPL_S_INIT) {
>>> +            rdb->session_seqno = seqno;
>>> +            request_id_clear(rdb);
>>> +
>>>              struct jsonrpc_msg *request;
>>>              request = jsonrpc_create_request("get_server_id",
>>>                                               json_array_create_empty(), 
>>> NULL);
>>> -            request_ids_add(request->id, NULL);
>>> -            jsonrpc_session_send(session, request);
>>> +            request_id_set(rdb, request->id);
>>> +            jsonrpc_session_send(rdb->session, request);
>>>  
>>> -            state = RPL_S_SERVER_ID_REQUESTED;
>>> -            VLOG_DBG("send server ID request.");
>>> +            rdb->state = RPL_S_SERVER_ID_REQUESTED;
>>> +            VLOG_DBG("%s: send server ID request.", rdb->db->name);
>>>          }
>>>  
>>> -        msg = jsonrpc_session_recv(session);
>>> +        msg = jsonrpc_session_recv(rdb->session);
>>>          if (!msg) {
>>>              continue;
>>>          }
>>>  
>>> -        if (msg->type == JSONRPC_NOTIFY && state != RPL_S_ERR
>>> +        if (msg->type == JSONRPC_NOTIFY && rdb->state != RPL_S_ERR
>>>              && !strcmp(msg->method, "update")) {
>>>              if (msg->params->type == JSON_ARRAY
>>>                  && msg->params->array.n == 2
>>>                  && msg->params->array.elems[0]->type == JSON_STRING) {
>>>                  char *db_name = msg->params->array.elems[0]->string;
>>> -                struct replication_db *rdb = find_db(db_name);
>>> -                if (rdb) {
>>> +
>>> +                if (!strcmp(db_name, rdb->db->name)) {
>>>                      struct ovsdb_error *error;
>>>                      error = 
>>> process_notification(msg->params->array.elems[1],
>>>                                                   rdb->db);
>>>                      if (error) {
>>>                          ovsdb_error_assert(error);
>>> -                        state = RPL_S_ERR;
>>> +                        rdb->state = RPL_S_ERR;
>>>                      }
>>> +                } else {
>>> +                    VLOG_WARN("%s: received update for unexpected database 
>>> %s",
>>> +                              rdb->db->name, db_name);
>>> +                    rdb->state = RPL_S_ERR;
>>>                  }
>>>              }
>>>          } else if (msg->type == JSONRPC_REPLY) {
>>> -            struct replication_db *rdb;
>>> -            struct ovsdb *db;
>>> -            if (!request_ids_lookup_and_free(msg->id, &db)) {
>>> -                VLOG_WARN("received unexpected reply");
>>> +            if (!request_id_compare_and_free(rdb, msg->id)) {
>>> +                VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
>>>                  goto next;
>>>              }
>>>  
>>> -            switch (state) {
>>> +            switch (rdb->state) {
>>>              case RPL_S_SERVER_ID_REQUESTED: {
>>>                  struct uuid uuid;
>>>                  if (msg->result->type != JSON_STRING ||
>>>                      !uuid_from_string(&uuid, json_string(msg->result))) {
>>>                      struct ovsdb_error *error;
>>>                      error = ovsdb_error("get_server_id failed",
>>> -                                        "Server ID is not valid UUID");
>>> +                                        "%s: Server ID is not valid UUID",
>>> +                                        rdb->db->name);
>>>  
>>>                      ovsdb_error_assert(error);
>>> -                    state = RPL_S_ERR;
>>> +                    rdb->state = RPL_S_ERR;
>>>                      break;
>>>                  }
>>>  
>>>                  if (uuid_equals(&uuid, &server_uuid)) {
>>>                      struct ovsdb_error *error;
>>>                      error = ovsdb_error("Server ID check failed",
>>> -                                        "Self replicating is not allowed");
>>> +                                        "%s: Self replicating is not 
>>> allowed",
>>> +                                        rdb->db->name);
>>>  
>>>                      ovsdb_error_assert(error);
>>> -                    state = RPL_S_ERR;
>>> +                    rdb->state = RPL_S_ERR;
>>>                      break;
>>>                  }
>>>  
>>> @@ -254,25 +287,32 @@ replication_run(void)
>>>                  request = jsonrpc_create_request("list_dbs",
>>>                                                   json_array_create_empty(),
>>>                                                   NULL);
>>> -                request_ids_add(request->id, NULL);
>>> -                jsonrpc_session_send(session, request);
>>> +                request_id_set(rdb, request->id);
>>> +                jsonrpc_session_send(rdb->session, request);
>>>  
>>> -                replication_dbs_destroy();
>>> -                replication_dbs = replication_dbs_create();
>>> -                state = RPL_S_DB_REQUESTED;
>>> +                rdb->state = RPL_S_DB_REQUESTED;
>>>                  break;
>>>              }
>>>              case RPL_S_DB_REQUESTED:
>>>                  if (msg->result->type != JSON_ARRAY) {
>>>                      struct ovsdb_error *error;
>>>                      error = ovsdb_error("list_dbs failed",
>>> -                                        "list_dbs response is not array");
>>> +                                        "%s: list_dbs response is not 
>>> array",
>>> +                                        rdb->db->name);
>>> +                    ovsdb_error_assert(error);
>>> +                    rdb->state = RPL_S_ERR;
>>> +                } else if (!json_array_contains_string(msg->result,
>>> +                            rdb->db->name)) {
>>> +                    struct ovsdb_error *error;
>>> +                    error = ovsdb_error("list_dbs failed",
>>> +                                        "%s: database name is not in the 
>>> list",
>>> +                                        rdb->db->name);
>>>                      ovsdb_error_assert(error);
>>> -                    state = RPL_S_ERR;
>>> +                    rdb->state = RPL_S_ERR;
>>>                  } else {
>>> -                    send_schema_requests(msg->result);
>>> -                    VLOG_DBG("Send schema requests");
>>> -                    state = RPL_S_SCHEMA_REQUESTED;
>>> +                    send_schema_request(rdb);
>>> +                    VLOG_DBG("%s: send schema request.", rdb->db->name);
>>> +                    rdb->state = RPL_S_SCHEMA_REQUESTED;
>>>                  }
>>>                  break;
>>>  
>>> @@ -283,19 +323,22 @@ replication_run(void)
>>>                  error = ovsdb_schema_from_json(msg->result, &schema);
>>>                  if (error) {
>>>                      ovsdb_error_assert(error);
>>> -                    state = RPL_S_ERR;
>>> +                    rdb->state = RPL_S_ERR;
>>> +                    break;
>>>                  }
>>>  
>>> -                rdb = find_db(schema->name);
>>> -                if (!rdb) {
>>> +                if (strcmp(rdb->db->name, schema->name)) {
>>>                      /* Unexpected schema. */
>>> -                    VLOG_WARN("unexpected schema %s", schema->name);
>>> -                    state = RPL_S_ERR;
>>> +                    VLOG_WARN("%s: unexpected schema %s.",
>>> +                              rdb->db->name, schema->name);
>>> +                    rdb->state = RPL_S_ERR;
>>> +                    ovsdb_schema_destroy(schema);
>>> +                    break;
>>>                  } else if (!ovsdb_schema_equal(schema, rdb->db->schema)) {
>>>                      /* Schmea version mismatch. */
>>> -                    VLOG_INFO("Schema version mismatch, checking if %s can 
>>> "
>>> -                              "still be replicated or not.",
>>> -                              schema->name);
>>> +                    VLOG_INFO("%s: Schema version mismatch, checking if %s 
>>> can"
>>> +                              " still be replicated or not.",
>>> +                              rdb->db->name, schema->name);
>>>                      if (is_replication_possible(rdb->db->schema, schema)) {
>>>                          VLOG_INFO("%s can be replicated.", schema->name);
>>>                          rdb->schema_version_higher = true;
>>> @@ -305,68 +348,48 @@ replication_run(void)
>>>                          rdb->active_db_schema = schema;
>>>                      } else {
>>>                          VLOG_INFO("%s cannot be replicated.", 
>>> schema->name);
>>> -                        struct replication_db *r =
>>> -                            shash_find_and_delete(replication_dbs,
>>> -                                                  schema->name);
>>> -                        if (r->active_db_schema) {
>>> -                            ovsdb_schema_destroy(r->active_db_schema);
>>> -                        }
>>> -                        free(r);
>>> +                        rdb->state = RPL_S_ERR;
>>>                          ovsdb_schema_destroy(schema);
>>> +                        break;
>>>                      }
>>>                  } else {
>>>                      ovsdb_schema_destroy(schema);
>>>                  }
>>>  
>>> -                /* After receiving schemas, reset the local databases that
>>> -                 * will be monitored and send out monitor requests for 
>>> them. */
>>> -                if (hmap_is_empty(&request_ids)) {
>>> -                    struct shash_node *node;
>>> -
>>> -                    if (shash_is_empty(replication_dbs)) {
>>> -                        VLOG_WARN("Nothing to replicate.");
>>> -                        state = RPL_S_ERR;
>>> -                    } else {
>>> -                        SHASH_FOR_EACH (node, replication_dbs) {
>>> -                            rdb = node->data;
>>> -                            struct jsonrpc_msg *request =
>>> -                                create_monitor_request(
>>> -                                    rdb->schema_version_higher ?
>>> -                                    rdb->active_db_schema : 
>>> rdb->db->schema);
>>> -
>>> -                            request_ids_add(request->id, rdb->db);
>>> -                            jsonrpc_session_send(session, request);
>>> -                            VLOG_DBG("Send monitor requests");
>>> -                            state = RPL_S_MONITOR_REQUESTED;
>>> -                        }
>>> -                    }
>>> -                }
>>> +                /* Send out a monitor request. */
>>> +                struct jsonrpc_msg *request =
>>> +                    create_monitor_request(rdb, rdb->schema_version_higher
>>> +                                                ? rdb->active_db_schema
>>> +                                                : rdb->db->schema);
>>> +
>>> +                request_id_set(rdb, request->id);
>>> +                jsonrpc_session_send(rdb->session, request);
>>> +                VLOG_DBG("%s: send monitor request.", rdb->db->name);
>>> +                rdb->state = RPL_S_MONITOR_REQUESTED;
>>>                  break;
>>>              }
>>>  
>>>              case RPL_S_MONITOR_REQUESTED: {
>>>                  /* Reply to monitor requests. */
>>>                  struct ovsdb_error *error;
>>> -                VLOG_INFO("Monitor request received. Resetting the 
>>> database");
>>> +                VLOG_INFO("%s: Monitor reply received. "
>>> +                          "Resetting the database.", rdb->db->name);
>>>                  /* Resetting the database here has few risks. If the
>>>                   * process_notification() fails, the database is completely
>>>                   * lost locally. In case that node becomes active, then
>>>                   * there is a chance of complete data loss in the 
>>> active/standy
>>>                   * cluster. */
>>> -                error = reset_database(db);
>>> +                error = reset_database(rdb);
>>>                  if (!error) {
>>> -                    error = process_notification(msg->result, db);
>>> +                    error = process_notification(msg->result, rdb->db);
>>>                  }
>>>                  if (error) {
>>>                      ovsdb_error_assert(error);
>>> -                    state = RPL_S_ERR;
>>> +                    rdb->state = RPL_S_ERR;
>>>                  } else {
>>> -                    /* Transition to replicating state after receiving
>>> -                     * all replies of "monitor" requests. */
>>> -                    if (hmap_is_empty(&request_ids)) {
>>> -                        VLOG_DBG("Listening to monitor updates");
>>> -                        state = RPL_S_REPLICATING;
>>> -                    }
>>> +                    VLOG_DBG("%s: Listening to monitor updates.",
>>> +                             rdb->db->name);
>>> +                    rdb->state = RPL_S_REPLICATING;
>>>                  }
>>>                  break;
>>>              }
>>> @@ -378,7 +401,7 @@ replication_run(void)
>>>              case RPL_S_INIT:
>>>              case RPL_S_REPLICATING:
>>>              default:
>>> -                OVS_NOT_REACHED();
>>> +                VLOG_WARN("%s: received unexpected reply.", rdb->db->name);
>>
>> Why do we just warn instead of aborting?  I assume the chance to get
>> here is as high as it was before the patch, right?
> 
> True.  I can bring back the OVS_NOT_REACHED().
> For us to get here we need to receive an expected reply while
> we're in an unexpected state.  That should not heppen, since
> the expected request id is always cleared on reply.  And we do
> not transition into new states until that reply is received.
> 
> 
>>
>>>              }
>>>          }
>>>      next:
>>> @@ -386,24 +409,40 @@ replication_run(void)
>>>      }
>>>  }
>>>  
>>> +void
>>> +replication_run(void)
>>> +{
>>> +    struct shash_node *node;
>>> +
>>> +    SHASH_FOR_EACH (node, &replication_dbs) {
>>> +        replication_run_db(node->data);
>>> +    }
>>> +}
>>> +
>>>  void
>>>  replication_wait(void)
>>>  {
>>> -    if (session) {
>>> -        jsonrpc_session_wait(session);
>>> -        jsonrpc_session_recv_wait(session);
>>> +    struct shash_node *node;
>>> +
>>> +    SHASH_FOR_EACH (node, &replication_dbs) {
>>> +        struct replication_db *rdb = node->data;
>>> +
>>> +        if (rdb->session) {
>>> +            jsonrpc_session_wait(rdb->session);
>>> +            jsonrpc_session_recv_wait(rdb->session);
>>> +        }
>>>      }
>>>  }
>>>  
>>> -/* Parse 'excluded' to rebuild 'excluded_tables'.  If 'dryrun' is false, 
>>> the
>>> - * current set of excluded tables will be wiped out, regardless of whether
>>> - * 'excluded' can be parsed.  If 'dryrun' is true, only parses 'excluded' 
>>> and
>>> +/* Parse 'excluded' to rebuild 'rdb->excluded_tables'.  If 'rdb' is not 
>>> NULL,
>>> + * the current set of excluded tables will be wiped out, regardless of 
>>> whether
>>> + * 'excluded' can be parsed.  If 'rdb' is NULL, only parses 'excluded' and
>>>   * reports any errors, without modifying the list of exclusions.
>>>   *
>>> - * On error, returns the error string, which the caller is
>>> - * responsible for freeing. Returns NULL otherwise. */
>>> -char * OVS_WARN_UNUSED_RESULT
>>> -set_excluded_tables(const char *excluded, bool dryrun)
>>> + * On error, returns the error string, which the caller is responsible for
>>> + * freeing.  Returns NULL otherwise. */
>>> +static char * OVS_WARN_UNUSED_RESULT
>>> +set_excluded_tables__(struct replication_db *rdb, const char *excluded)
>>>  {
>>>      struct sset set = SSET_INITIALIZER(&set);
>>>      char *err = NULL;
>>> @@ -411,17 +450,22 @@ set_excluded_tables(const char *excluded, bool dryrun)
>>>      if (excluded) {
>>>          const char *longname;
>>>  
>>> -        if (!dryrun) {
>>> -            /* Can only add to an empty shash. */
>>> -            excluded_tables_clear();
>>> +        if (rdb) {
>>> +            /* Can only add to an empty set. */
>>> +            sset_clear(&rdb->excluded_tables);
>>>          }
>>>  
>>>          sset_from_delimited_string(&set, excluded, " ,");
>>>          SSET_FOR_EACH (longname, &set) {
>>> +            if (rdb && !strchr(longname, ':')) {
>>> +                sset_add(&rdb->excluded_tables, longname);
>>> +                continue;
>>> +            }
>>> +
>>>              char *database = xstrdup(longname), *table = NULL;
>>>              strtok_r(database, ":", &table);
>>> -            if (table && !dryrun) {
>>> -                excluded_tables_add(database, table);
>>> +            if (table && rdb && !strcmp(rdb->db->name, database)) {
>>> +                sset_add(&rdb->excluded_tables, table);
>>>              }
>>>  
>>>              free(database);
>>> @@ -434,120 +478,74 @@ set_excluded_tables(const char *excluded, bool 
>>> dryrun)
>>>  
>>>  done:
>>>      sset_destroy(&set);
>>> -    if (err && !dryrun) {
>>> +    if (err && rdb) {
>>>          /* On error, destroy the partially built 'excluded_tables'. */
>>> -        excluded_tables_clear();
>>> +        sset_clear(&rdb->excluded_tables);
>>>      }
>>>      return err;
>>>  }
>>>  
>>>  char * OVS_WARN_UNUSED_RESULT
>>> -get_excluded_tables(void)
>>> +parse_excluded_tables(const char *excluded)
>>>  {
>>> -    struct shash_node *node;
>>> -    struct sset set = SSET_INITIALIZER(&set);
>>> -
>>> -    SHASH_FOR_EACH (node, &excluded_tables) {
>>> -        const char *database = node->name;
>>> -        const char *table;
>>> -        struct sset *tables = node->data;
>>> -
>>> -        SSET_FOR_EACH (table, tables) {
>>> -            sset_add_and_free(&set, xasprintf("%s:%s", database, table));
>>> -        }
>>> -    }
>>> -
>>> -    /* Output the table list in an sorted order, so that
>>> -     * the output string will not depend on the hash function
>>> -     * that used to implement the hmap data structure. This is
>>> -     * only useful for writting unit tests.  */
>>> -    const char **sorted = sset_sort(&set);
>>> -    struct ds ds = DS_EMPTY_INITIALIZER;
>>> -    size_t i;
>>> -    for (i = 0; i < sset_count(&set); i++) {
>>> -        ds_put_format(&ds, "%s,", sorted[i]);
>>> -    }
>>> -
>>> -    ds_chomp(&ds, ',');
>>> -
>>> -    free(sorted);
>>> -    sset_destroy(&set);
>>> -
>>> -    return ds_steal_cstr(&ds);
>>> +    return set_excluded_tables__(NULL, excluded);
>>>  }
>>>  
>>> -static void
>>> -excluded_tables_clear(void)
>>> +static char * OVS_WARN_UNUSED_RESULT
>>> +set_excluded_tables(struct replication_db *rdb, const char *excluded)
>>>  {
>>> -    struct shash_node *node;
>>> -    SHASH_FOR_EACH (node, &excluded_tables) {
>>> -        struct sset *tables = node->data;
>>> -        sset_destroy(tables);
>>> -    }
>>> -
>>> -    shash_clear_free_data(&excluded_tables);
>>> +    return set_excluded_tables__(rdb, excluded);
>>>  }
>>>  
>>> -static void
>>> -excluded_tables_add(const char *database, const char *table)
>>> +char * OVS_WARN_UNUSED_RESULT
>>> +get_excluded_tables(const struct ovsdb *db)
>>>  {
>>> -    struct sset *tables = shash_find_data(&excluded_tables, database);
>>> +    const struct replication_db *rdb = find_db(db->name);
>>>  
>>> -    if (!tables) {
>>> -        tables = xmalloc(sizeof *tables);
>>> -        sset_init(tables);
>>> -        shash_add(&excluded_tables, database, tables);
>>> +    if (!rdb) {
>>> +        return xstrdup("");
>>>      }
>>>  
>>> -    sset_add(tables, table);
>>> -}
>>> +    struct sset set = SSET_INITIALIZER(&set);
>>> +    const char *table;
>>> +    char *result;
>>>  
>>> -static bool
>>> -excluded_tables_find(const char *database, const char *table)
>>> -{
>>> -    struct sset *tables = shash_find_data(&excluded_tables, database);
>>> -    return tables && sset_contains(tables, table);
>>> -}
>>> +    SSET_FOR_EACH (table, &rdb->excluded_tables) {
>>> +        sset_add_and_free(&set, xasprintf("%s:%s", rdb->db->name, table));
>>> +    }
>>>  
>>> -void
>>> -disconnect_active_server(void)
>>> -{
>>> -    jsonrpc_session_close(session);
>>> -    session = NULL;
>>> +    result = sset_join(&set, ",", "");
>>> +    sset_destroy(&set);
>>> +
>>> +    return result;
>>>  }
>>>  
>>>  void
>>>  replication_destroy(void)
>>>  {
>>> -    excluded_tables_clear();
>>> -    shash_destroy(&excluded_tables);
>>> +    struct shash_node *node;
>>>  
>>> -    if (sync_from) {
>>> -        free(sync_from);
>>> -        sync_from = NULL;
>>> +    SHASH_FOR_EACH (node, &replication_dbs) {
>>> +        replication_db_destroy(node->data);
>>>      }
>>> -
>>> -    request_ids_destroy();
>>> -    replication_dbs_destroy();
>>> -
>>> -    shash_destroy(&local_dbs);
>>> +    shash_destroy_free_data(&replication_dbs);
>>>  }
>>>  
>>>  static struct replication_db *
>>>  find_db(const char *db_name)
>>>  {
>>> -    return shash_find_data(replication_dbs, db_name);
>>> +    return shash_find_data(&replication_dbs, db_name);
>>>  }
>>>  
>>>  static struct ovsdb_error *
>>> -reset_database(struct ovsdb *db)
>>> +reset_database(struct replication_db *rdb)
>>>  {
>>> -    struct ovsdb_txn *txn = ovsdb_txn_create(db);
>>> +    struct ovsdb_txn *txn = ovsdb_txn_create(rdb->db);
>>>      struct shash_node *table_node;
>>>  
>>> -    SHASH_FOR_EACH (table_node, &db->tables) {
>>> +    SHASH_FOR_EACH (table_node, &rdb->db->tables) {
>>>          /* Delete all rows if the table is not excluded. */
>>> -        if (!excluded_tables_find(db->schema->name, table_node->name)) {
>>> +        if (!sset_contains(&rdb->excluded_tables, table_node->name)) {
>>>              struct ovsdb_table *table = table_node->data;
>>>              struct ovsdb_row *row;
>>>              HMAP_FOR_EACH_SAFE (row, hmap_node, &table->rows) {
>>> @@ -565,7 +563,7 @@ reset_database(struct ovsdb *db)
>>>   * Caller is responsible for disposing 'request'.
>>>   */
>>>  static struct jsonrpc_msg *
>>> -create_monitor_request(struct ovsdb_schema *schema)
>>> +create_monitor_request(struct replication_db *rdb, struct ovsdb_schema 
>>> *schema)
>>>  {
>>>      struct jsonrpc_msg *request;
>>>      struct json *monitor;
>>> @@ -579,7 +577,7 @@ create_monitor_request(struct ovsdb_schema *schema)
>>>          struct ovsdb_table_schema *table = nodes[j]->data;
>>>  
>>>          /* Monitor all tables not excluded. */
>>> -        if (!excluded_tables_find(db_name, table->name)) {
>>> +        if (!sset_contains(&rdb->excluded_tables, table->name)) {
>>>              add_monitored_table(table, monitor_request);
>>>          }
>>>      }
>>> @@ -689,114 +687,76 @@ process_table_update(struct json *table_update, 
>>> const char *table_name,
>>>      return NULL;
>>>  }
>>>  
>>> -void
>>> -request_ids_add(const struct json *id, struct ovsdb *db)
>>> +static void
>>> +request_id_set(struct replication_db *rdb, const struct json *id)
>>>  {
>>> -    struct request_ids_hmap_node *node = xmalloc(sizeof *node);
>>> +    ovs_assert(!rdb->request_id);
>>> +    rdb->request_id = json_clone(id);
>>> +}
>>>  
>>> -    node->request_id = json_clone(id);
>>> -    node->db = db;
>>> -    hmap_insert(&request_ids, &node->hmap, json_hash(id, 0));
>>> +static void
>>> +request_id_clear(struct replication_db *rdb)
>>> +{
>>> +    json_destroy(rdb->request_id);
>>> +    rdb->request_id = NULL;
>>>  }
>>>  
>>> -/* Look up 'id' from 'request_ids', if found, remove the found id from
>>> - * 'request_ids' and free its memory. If not found, 'request_ids' does
>>> - * not change.  Sets '*db' to the database for the request (NULL if not
>>> - * found).
>>> +/* Compare 'id' with sent 'request_id'.  If it mtches, clear the current
>>
>> Typo: mtches
>>
>>> + * 'request_id'.  If it doesn't match, 'request_id' does not change.
>>>   *
>>> - * Return true if 'id' is found, false otherwise.
>>> + * Return true if 'id' matches, false otherwise.
>>>   */
>>> -bool
>>> -request_ids_lookup_and_free(const struct json *id, struct ovsdb **db)
>>> +static bool
>>> +request_id_compare_and_free(struct replication_db *rdb, const struct json 
>>> *id)
>>>  {
>>> -    struct request_ids_hmap_node *node;
>>> -
>>> -    HMAP_FOR_EACH_WITH_HASH (node, hmap, json_hash(id, 0), &request_ids) {
>>> -        if (json_equal(id, node->request_id)) {
>>> -            hmap_remove(&request_ids, &node->hmap);
>>> -            *db = node->db;
>>> -            json_destroy(node->request_id);
>>> -            free(node);
>>> -            return true;
>>> -        }
>>> +    if (rdb->request_id && json_equal(id, rdb->request_id)) {
>>> +        request_id_clear(rdb);
>>> +        return true;
>>>      }
>>> -
>>> -    *db = NULL;
>>>      return false;
>>>  }
>>>  
>>>  static void
>>> -request_ids_destroy(void)
>>> +replication_db_destroy(struct replication_db *rdb)
>>>  {
>>> -    struct request_ids_hmap_node *node;
>>> -
>>> -    HMAP_FOR_EACH_POP (node, hmap, &request_ids) {
>>> -        json_destroy(node->request_id);
>>> -        free(node);
>>> +    if (!rdb) {
>>> +        return;
>>>      }
>>> -    hmap_destroy(&request_ids);
>>> -}
>>>  
>>> -void
>>> -request_ids_clear(void)
>>> -{
>>> -    request_ids_destroy();
>>> -    hmap_init(&request_ids);
>>> -}
>>> +    free(rdb->sync_from);
>>> +    rdb->sync_from = NULL;
>>>  
>>> -static struct shash *
>>> -replication_dbs_create(void)
>>> -{
>>> -    struct shash *new = xmalloc(sizeof *new);
>>> -    shash_init(new);
>>> +    free(rdb->excluded_tables_str);
>>> +    rdb->excluded_tables_str = NULL;
>>> +    sset_destroy(&rdb->excluded_tables);
>>>  
>>> -    struct shash_node *node;
>>> -    SHASH_FOR_EACH (node, &local_dbs) {
>>> -        struct replication_db *repl_db = xmalloc(sizeof *repl_db);
>>> -        repl_db->db = node->data;
>>> -        repl_db->schema_version_higher = false;
>>> -        repl_db->active_db_schema = NULL;
>>> -        shash_add(new, node->name, repl_db);
>>> -    }
>>> +    request_id_clear(rdb);
>>>  
>>> -    return new;
>>> -}
>>> -
>>> -static void
>>> -replication_dbs_destroy(void)
>>> -{
>>> -    if (!replication_dbs) {
>>> -        return;
>>> +    if (rdb->session) {
>>> +        jsonrpc_session_close(rdb->session);
>>> +        rdb->session = NULL;
>>>      }
>>>  
>>> -    struct shash_node *node;
>>> -
>>> -    SHASH_FOR_EACH_SAFE (node, replication_dbs) {
>>> -        hmap_remove(&replication_dbs->map, &node->node);
>>> -        struct replication_db *rdb = node->data;
>>> -        if (rdb->active_db_schema) {
>>> -            ovsdb_schema_destroy(rdb->active_db_schema);
>>> -        }
>>> -        free(rdb);
>>> -        free(node->name);
>>> -        free(node);
>>> +    if (rdb->active_db_schema) {
>>> +        ovsdb_schema_destroy(rdb->active_db_schema);
>>> +        rdb->active_db_schema = NULL;
>>>      }
>>>  
>>> -    hmap_destroy(&replication_dbs->map);
>>> -    free(replication_dbs);
>>> -    replication_dbs = NULL;
>>> +    rdb->schema_version_higher = false;
>>>  }
>>>  
>>>  /* Return true if replication just started or is ongoing.
>>>   * Return false if the connection failed, or the replication
>>>   * was not able to start. */
>>>  bool
>>> -replication_is_alive(void)
>>> +replication_is_alive(const struct ovsdb *db)
>>>  {
>>> -    if (session) {
>>> -        return jsonrpc_session_is_alive(session) && state != RPL_S_ERR;
>>> +    const struct replication_db *rdb = find_db(db->name);
>>> +
>>> +    if (!rdb || !rdb->session) {
>>> +        return false;
>>>      }
>>> -    return false;
>>> +    return jsonrpc_session_is_alive(rdb->session) && rdb->state != 
>>> RPL_S_ERR;
>>>  }
>>>  
>>>  /* Return the last error reported on a connection by 'session'. The
>>> @@ -806,60 +766,60 @@ replication_is_alive(void)
>>>   * Return a negative value if replication session has error, or the
>>>   * replication was not able to start.  */
>>>  int
>>> -replication_get_last_error(void)
>>> +replication_get_last_error(const struct ovsdb *db)
>>>  {
>>> +    const struct replication_db *rdb = find_db(db->name);
>>>      int err = 0;
>>>  
>>> -    if (session) {
>>> -        err = jsonrpc_session_get_last_error(session);
>>> +    if (rdb && rdb->session) {
>>> +        err = jsonrpc_session_get_last_error(rdb->session);
>>>          if (!err) {
>>> -            err = (state == RPL_S_ERR) ? ENOENT : 0;
>>> +            err = (rdb->state == RPL_S_ERR) ? ENOENT : 0;
>>>          }
>>>      }
>>>  
>>>      return err;
>>>  }
>>>  
>>> -char *
>>> -replication_status(void)
>>> +char * OVS_WARN_UNUSED_RESULT
>>> +replication_status(const struct ovsdb *db)
>>>  {
>>> -    bool alive = session && jsonrpc_session_is_alive(session);
>>> +    const struct replication_db *rdb = find_db(db->name);
>>> +
>>> +    if (!rdb) {
>>> +        return xasprintf("%s is not configured for replication", db->name);
>>> +    }
>>> +
>>> +    bool alive = rdb->session && jsonrpc_session_is_alive(rdb->session);
>>>      struct ds ds = DS_EMPTY_INITIALIZER;
>>>  
>>> +    ds_put_format(&ds, "database: %s\n", db->name);
>>>      if (alive) {
>>> -        switch(state) {
>>> +        switch (rdb->state) {
>>>          case RPL_S_INIT:
>>>          case RPL_S_SERVER_ID_REQUESTED:
>>>          case RPL_S_DB_REQUESTED:
>>>          case RPL_S_SCHEMA_REQUESTED:
>>>          case RPL_S_MONITOR_REQUESTED:
>>> -            ds_put_format(&ds, "connecting: %s", sync_from);
>>> +            ds_put_format(&ds, "connecting: %s", rdb->sync_from);
>>>              break;
>>>          case RPL_S_REPLICATING: {
>>> -            struct shash_node *node;
>>> -
>>> -            ds_put_format(&ds, "replicating: %s\n", sync_from);
>>> -            ds_put_cstr(&ds, "database:");
>>> -            SHASH_FOR_EACH (node, replication_dbs) {
>>> -                ds_put_format(&ds, " %s,", node->name);
>>> -            }
>>> -            ds_chomp(&ds, ',');
>>> +            ds_put_format(&ds, "replicating: %s\n", rdb->sync_from);
>>>  
>>> -            if (!shash_is_empty(&excluded_tables)) {
>>> -                ds_put_char(&ds, '\n');
>>> +            if (!sset_is_empty(&rdb->excluded_tables)) {
>>>                  ds_put_cstr(&ds, "exclude: ");
>>> -                ds_put_and_free_cstr(&ds, get_excluded_tables());
>>> +                ds_put_and_free_cstr(&ds, get_excluded_tables(db));
>>>              }
>>>              break;
>>>          }
>>>          case RPL_S_ERR:
>>> -            ds_put_format(&ds, "Replication to (%s) failed\n", sync_from);
>>> +            ds_put_format(&ds, "Replication to (%s) failed", 
>>> rdb->sync_from);
>>>              break;
>>>          default:
>>>              OVS_NOT_REACHED();
>>>          }
>>>      } else {
>>> -        ds_put_format(&ds, "not connected to %s", sync_from);
>>> +        ds_put_format(&ds, "not connected to %s", rdb->sync_from);
>>>      }
>>>      return ds_steal_cstr(&ds);
>>>  }
>>> @@ -913,10 +873,12 @@ is_replication_possible(struct ovsdb_schema 
>>> *local_db_schema,
>>>  }
>>>  
>>>  void
>>> -replication_set_probe_interval(int probe_interval)
>>> +replication_set_probe_interval(const struct ovsdb *db, int probe_interval)
>>>  {
>>> -    if (session) {
>>> -        jsonrpc_session_set_probe_interval(session, probe_interval);
>>> +    const struct replication_db *rdb = find_db(db->name);
>>> +
>>> +    if (rdb && rdb->session) {
>>> +        jsonrpc_session_set_probe_interval(rdb->session, probe_interval);
>>>      }
>>>  }
>>>  
>>> diff --git a/ovsdb/replication.h b/ovsdb/replication.h
>>> index 6d1be820f..f5e226753 100644
>>> --- a/ovsdb/replication.h
>>> +++ b/ovsdb/replication.h
>>> @@ -26,41 +26,41 @@ struct ovsdb;
>>>   * API Usage
>>>   *===========
>>>   *
>>> - * - replication_init() needs to be called whenever OVSDB server switches 
>>> into
>>> + * - replication_set_db() needs to be called whenever database switches 
>>> into
>>>   *   the backup mode.
>>>   *
>>> - * - replication_add_local_db() should be called immediately after to add 
>>> all
>>> - *   known database that OVSDB server owns, one at a time.
>>> + * - replication_remove_db() needs to be called whenever backup database
>>> + *   switches into an active mode.
>>>   *
>>>   * - replication_destroy() should be called when OVSDB server shutdown to
>>>   *   reclaim resources.
>>>   *
>>>   * - replication_run(), replication_wait(), replication_is_alive() and
>>>   *   replication_get_last_error() should be call within the main loop
>>> - *   whenever OVSDB server runs in the backup mode.
>>> + *   whenever OVSDB has backup databases.
>>>   *
>>> - * - set_excluded_tables(), get_excluded_tables(), 
>>> disconnect_active_server()
>>> - *   and replication_usage() are support functions used mainly by unixctl
>>> - *   commands.
>>> + * - parse_excluded_tables(), get_excluded_tables() and replication_usage()
>>> + *   are support functions used mainly by unixctl commands.
>>>   */
>>>  
>>>  #define REPLICATION_DEFAULT_PROBE_INTERVAL 60000
>>>  
>>> -void replication_init(const char *sync_from, const char *exclude_tables,
>>> -                      const struct uuid *server, int probe_interval);
>>> +void replication_set_db(struct ovsdb *, const char *sync_from,
>>> +                        const char *exclude_tables, const struct uuid 
>>> *server,
>>> +                        int probe_interval);
>>> +void replication_remove_db(const struct ovsdb *);
>>> +
>>>  void replication_run(void);
>>>  void replication_wait(void);
>>>  void replication_destroy(void);
>>>  void replication_usage(void);
>>> -void replication_add_local_db(const char *databse, struct ovsdb *db);
>>> -bool replication_is_alive(void);
>>> -int replication_get_last_error(void);
>>> -char *replication_status(void);
>>> -void replication_set_probe_interval(int);
>>> +bool replication_is_alive(const struct ovsdb *);
>>> +int replication_get_last_error(const struct ovsdb *);
>>> +char *replication_status(const struct ovsdb *);
>>> +void replication_set_probe_interval(const struct ovsdb *, int 
>>> probe_interval);
>>>  
>>> -char *set_excluded_tables(const char *excluded, bool dryrun)
>>> -    OVS_WARN_UNUSED_RESULT;
>>> -char *get_excluded_tables(void) OVS_WARN_UNUSED_RESULT;
>>> -void disconnect_active_server(void);
>>> +char *parse_excluded_tables(const char *excluded) OVS_WARN_UNUSED_RESULT;
>>> +char *get_excluded_tables(const struct ovsdb *) OVS_WARN_UNUSED_RESULT;
>>> +void disconnect_active_server(const struct ovsdb *);
>>
>> This should be removed too.
> 
> Good catch.  Thanks!
> 

OK, I don't think there's a need for v3 in order to address all the
minor issues above.  If you're ok with fixing them up at apply time then
feel free to add my ack:

Acked-by: Dumitru Ceara <[email protected]>

Thanks,
Dumitru

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to