From: Numan Siddique <[email protected]> The present code resets the database when it is in the state - 'RPL_S_SCHEMA_REQUESTED' and repopulates the database when it receives the monitor reply when it is in the state - 'RPL_S_MONITOR_REQUESTED'. If however, it goes to active mode before it processes the monitor reply, the whole data is lost.
This patch fixes the issue by - deleting the code to reset the database in the state - 'RPL_S_SCHEMA_REQUESTED' - storing the result of the process_notification() (when it receives the monitor reply) in the memory (using unbacked 'struct ovsdb *' object) - resetting the database - repopulating the db from the in memory db This approach is a bit memory invasive and not very effecient, but guarantees that the data is safe. An alternate approach to solve this issue is to reset the database when it receives the monitor reply (before processing it), so that reset and repopulation of the db happens in the same state. This is simpler, but it has a small window for data loss if the function process_notification() which processes the monitor reply fails for some reason. Reported-by: Han Zhou <[email protected]> Reported-at: https://mail.openvswitch.org/pipermail/ovs-discuss/2018-August/047161.html Signed-off-by: Numan Siddique <[email protected]> --- ovsdb/replication.c | 137 +++++++++++++++++++++++++++++++++++++------- 1 file changed, 115 insertions(+), 22 deletions(-) diff --git a/ovsdb/replication.c b/ovsdb/replication.c index 2b9ae2f83..1a648b899 100644 --- a/ovsdb/replication.c +++ b/ovsdb/replication.c @@ -30,6 +30,7 @@ #include "replication.h" #include "row.h" #include "sset.h" +#include "storage.h" #include "stream.h" #include "svec.h" #include "table.h" @@ -105,11 +106,18 @@ static enum ovsdb_replication_state state; * schema matches. */ static struct shash local_dbs = SHASH_INITIALIZER(&local_dbs); static struct shash *replication_dbs; +static struct shash unbacked_dbs = SHASH_INITIALIZER(&unbacked_dbs); static struct shash *replication_db_clone(struct shash *dbs); static void replication_dbs_destroy(void); /* Find 'struct ovsdb' by name within 'replication_dbs' */ static struct ovsdb* find_db(const char *db_name); + +static void create_unbacked_db(struct ovsdb *db); +static void unbacked_dbs_destroy(void); +static struct ovsdb *find_unbacked_db(const char *db_name); +static struct ovsdb_error *clone_from_unbacked_database( + struct ovsdb *db, struct ovsdb *unbacked_db); void @@ -123,7 +131,7 @@ replication_init(const char *sync_from_, const char *exclude_tables, ovs_assert(!set_blacklist_tables(exclude_tables, false)); replication_dbs_destroy(); - + unbacked_dbs_destroy(); shash_clear(&local_dbs); if (session) { jsonrpc_session_close(session); @@ -299,20 +307,7 @@ replication_run(void) /* After receiving schemas, reset the local databases that * will be monitored and send out monitor requests for them. */ if (hmap_is_empty(&request_ids)) { - struct shash_node *node, *next; - - SHASH_FOR_EACH_SAFE (node, next, replication_dbs) { - db = node->data; - error = reset_database(db); - if (error) { - const char *db_name = db->schema->name; - shash_find_and_delete(replication_dbs, db_name); - ovsdb_error_assert(error); - VLOG_WARN("Failed to reset database, " - "%s not replicated.", db_name); - } - } - + struct shash_node *node; if (shash_is_empty(replication_dbs)) { VLOG_WARN("Nothing to replicate."); state = RPL_S_ERR; @@ -321,7 +316,7 @@ replication_run(void) db = node->data; struct jsonrpc_msg *request = create_monitor_request(db); - + create_unbacked_db(db); request_ids_add(request->id, db); jsonrpc_session_send(session, request); VLOG_DBG("Send monitor requests"); @@ -335,18 +330,51 @@ replication_run(void) case RPL_S_MONITOR_REQUESTED: { /* Reply to monitor requests. */ struct ovsdb_error *error; - error = process_notification(msg->result, db); + struct ovsdb *unbacked_db = find_unbacked_db(db->name); + error = process_notification(msg->result, unbacked_db); if (error) { ovsdb_error_assert(error); state = RPL_S_ERR; - } else { - /* Transition to replicating state after receiving - * all replies of "monitor" requests. */ - if (hmap_is_empty(&request_ids)) { + break; + } + if (!hmap_is_empty(&request_ids)) { + break; + } + if (shash_is_empty(replication_dbs)) { + VLOG_WARN("Nothing to replicate."); + state = RPL_S_ERR; + break; + } + + struct shash_node *node; + SHASH_FOR_EACH (node, replication_dbs) { + db = node->data; + /* Reset the local databases and then clone from the + * unbacked db. + */ + error = reset_database(db); + if (error) { + const char *db_name = db->schema->name; + shash_find_and_delete(replication_dbs, db_name); + ovsdb_error_assert(error); + VLOG_WARN("Failed to reset database, " + "%s not replicated.", db_name); + state = RPL_S_ERR; + break; + } + unbacked_db = find_unbacked_db(db->name); + error = clone_from_unbacked_database(db, unbacked_db); + if (error) { + ovsdb_error_assert(error); + VLOG_WARN("Failed to set database, " + "%s not replicated.", db->name); + state = RPL_S_ERR; + } else { VLOG_DBG("Listening to monitor updates"); state = RPL_S_REPLICATING; } } + unbacked_dbs_destroy(); break; } @@ -508,7 +536,7 @@ replication_destroy(void) request_ids_destroy(); replication_dbs_destroy(); - + unbacked_dbs_destroy(); shash_destroy(&local_dbs); } @@ -517,6 +545,37 @@ find_db(const char *db_name) { return shash_find_data(replication_dbs, db_name); } + +static void +create_unbacked_db(struct ovsdb *db) +{ + struct ovsdb *unbacked_db = ovsdb_create(ovsdb_schema_clone(db->schema), + ovsdb_storage_create_unbacked()); + struct shash_node *node = shash_find(&unbacked_dbs, db->name); + if (node) { + shash_delete(&unbacked_dbs, node); + ovsdb_destroy(node->data); + } + shash_add_assert(&unbacked_dbs, db->name, unbacked_db); +} + +static void +unbacked_dbs_destroy(void) +{ + struct shash_node *node, *next; + + SHASH_FOR_EACH_SAFE (node, next, &unbacked_dbs) { + ovsdb_destroy(node->data); + shash_delete(&unbacked_dbs, node); + } + shash_destroy(&unbacked_dbs); +} + +static struct ovsdb * +find_unbacked_db(const char *db_name) +{ + return shash_find_data(&unbacked_dbs, db_name); +} static struct ovsdb_error * reset_database(struct ovsdb *db) @@ -538,6 +597,40 @@ reset_database(struct ovsdb *db) return ovsdb_txn_propose_commit_block(txn, false); } +static struct ovsdb_error * +clone_from_unbacked_database(struct ovsdb *db, struct ovsdb *unbacked_db) +{ + struct ovsdb_txn *txn = ovsdb_txn_create(db); + struct shash_node *table_node; + + SHASH_FOR_EACH (table_node, &unbacked_db->tables) { + struct ovsdb_table *unbacked_db_table = table_node->data; + struct ovsdb_table *table = ovsdb_get_table(db, table_node->name); + struct ovsdb_row *unbacked_db_row, *next; + struct ovsdb_column_set all_columns; + ovsdb_column_set_init(&all_columns); + ovsdb_column_set_add_all(&all_columns, unbacked_db_table); + + HMAP_FOR_EACH_SAFE (unbacked_db_row, next, hmap_node, + &unbacked_db_table->rows) { + const struct uuid *row_uuid = ovsdb_row_get_uuid(unbacked_db_row); + struct ovsdb_row *row = ovsdb_row_create(table); + struct json *unbacked_db_json_row = + ovsdb_row_to_json(unbacked_db_row, &all_columns); + struct ovsdb_error *error = ovsdb_row_from_json( + row, unbacked_db_json_row, NULL, NULL); + if (error) { + return error; + } + *ovsdb_row_get_uuid_rw(row) = *row_uuid; + ovsdb_txn_row_insert(txn, row); + } + ovsdb_column_set_destroy(&all_columns); + } + + return ovsdb_txn_propose_commit_block(txn, false); +} + /* Create a monitor request for 'db'. The monitor request will include * any tables from 'blacklisted_tables' * -- 2.17.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
