On 12/19/20 3:44 AM, Ben Pfaff wrote: > This change breaks the IDL into two layers: the IDL proper, whose > interface to its client is unchanged, and a low-level library called > the OVSDB "client synchronization" (CS) library. There are two > reasons for this change. First, the IDL is big and complicated and > I think that this change factors out some of that complication into > a simpler lower layer. Second, the OVN northd implementation based > on DDlog can benefit from the client synchronization library even > though it would actually be made increasingly complicated by the IDL. > > Signed-off-by: Ben Pfaff <b...@ovn.org> > ---
Hi Ben, Overall this looks OK to me. OVS & OVN unit tests pass with this series applied. There are however a couple memory leaks and I have another small comment inline. > lib/ovsdb-cs.c | 1955 ++++++++++++++++++++++++++++++++++ > lib/ovsdb-cs.h | 141 ++- > lib/ovsdb-idl-provider.h | 8 +- > lib/ovsdb-idl.c | 2161 +++++++++----------------------------- > 4 files changed, 2563 insertions(+), 1702 deletions(-) > > diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c > index f37aa5b04414..285b164108b2 100644 > --- a/lib/ovsdb-cs.c > +++ b/lib/ovsdb-cs.c > @@ -39,6 +39,1947 @@ > #include "uuid.h" > > VLOG_DEFINE_THIS_MODULE(ovsdb_cs); > + > +/* Connection state machine. > + * > + * When a JSON-RPC session connects, the CS layer sends a "monitor_cond" > + * request for the Database table in the _Server database and transitions to > + * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and > + * reconnects, or if the CS receives a "monitor_canceled" notification for a > + * table it is monitoring, the CS starts over again in the same way. */ > +#define OVSDB_CS_STATES \ > + /* Waits for "get_schema" reply, then sends "monitor_cond" \ > + * request for the Database table in the _Server database, whose \ > + * details are informed by the schema, and transitions to \ > + * CS_S_SERVER_MONITOR_REQUESTED. */ \ > + OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \ > + \ > + /* Waits for "monitor_cond" reply for the Database table: \ > + * \ > + * - If the reply indicates success, and the Database table has a \ > + * row for the CS database: \ > + * \ > + * * If the row indicates that this is a clustered database \ > + * that is not connected to the cluster, closes the \ > + * connection. The next connection attempt has a chance at \ > + * picking a connected server. \ > + * \ > + * * Otherwise, sends a monitoring request for the CS \ > + * database whose details are informed by the schema \ > + * (obtained from the row), and transitions to \ > + * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \ > + * \ > + * - If the reply indicates success, but the Database table does \ > + * not have a row for the CS database, transitions to \ > + * CS_S_ERROR. \ > + * \ > + * - If the reply indicates failure, sends a "get_schema" request \ > + * for the CS database and transitions to \ > + * CS_S_DATA_SCHEMA_REQUESTED. */ \ > + OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \ > + \ > + /* Waits for "get_schema" reply, then sends "monitor_cond" \ > + * request whose details are informed by the schema, and \ > + * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \ > + OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \ > + \ > + /* Waits for "monitor_cond_since" reply. If successful, replaces \ > + * the CS contents by the data carried in the reply and \ > + * transitions to CS_S_MONITORING. On failure, sends a \ > + * "monitor_cond" request and transitions to \ > + * CS_S_DATA_MONITOR_COND_REQUESTED. */ \ > + OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \ > + \ > + /* Waits for "monitor_cond" reply. If successful, replaces the \ > + * CS contents by the data carried in the reply and transitions \ > + * to CS_S_MONITORING. On failure, sends a "monitor" request \ > + * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \ > + OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \ > + \ > + /* Waits for "monitor" reply. If successful, replaces the CS \ > + * contents by the data carried in the reply and transitions to \ > + * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \ > + OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \ > + \ > + /* State that processes "update", "update2" or "update3" \ > + * notifications for the main database (and the Database table \ > + * in _Server if available). \ > + * \ > + * If we're monitoring the Database table and we get notified \ > + * that the CS database has been deleted, we close the \ > + * connection (which will restart the state machine). */ \ > + OVSDB_CS_STATE(MONITORING) \ > + \ > + /* Terminal error state that indicates that nothing useful can be \ > + * done, for example because the database server doesn't actually \ > + * have the desired database. We maintain the session with the \ > + * database server anyway. If it starts serving the database \ > + * that we want, or if someone fixes and restarts the database, \ > + * then it will kill the session and we will automatically \ > + * reconnect and try again. */ \ > + OVSDB_CS_STATE(ERROR) \ > + \ > + /* Terminal state that indicates we connected to a useless server \ > + * in a cluster, e.g. one that is partitioned from the rest of \ > + * the cluster. We're waiting to retry. */ \ > + OVSDB_CS_STATE(RETRY) > + > +enum ovsdb_cs_state { > +#define OVSDB_CS_STATE(NAME) CS_S_##NAME, > + OVSDB_CS_STATES > +#undef OVSDB_CS_STATE > +}; > + > +static const char * > +ovsdb_cs_state_to_string(enum ovsdb_cs_state state) > +{ > + switch (state) { > +#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME; > + OVSDB_CS_STATES > +#undef OVSDB_CS_STATE > + default: return "<unknown>"; > + } > +} > + > +/* A database being monitored. > + * > + * There are two instances of this data structure for each CS instance, one > for > + * the _Server database used for working with clusters, and the other one for > + * the actual database that the client is interested in. */ > +struct ovsdb_cs_db { > + struct ovsdb_cs *cs; > + > + /* Data. */ > + const char *db_name; /* Database's name. */ > + struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/ > + struct json *monitor_id; > + struct json *schema; > + > + /* Monitor version. */ > + int max_version; /* Maximum version of monitor request to > use. */ > + int monitor_version; /* 0 if not monitoring, 1=monitor, > + * 2=monitor_cond, 3=monitor_cond_since. */ > + > + /* Condition changes. */ > + bool cond_changed; /* Change not yet sent to server? */ > + unsigned int cond_seqno; /* Increments when condition changes. */ > + > + /* Database locking. */ > + char *lock_name; /* Name of lock we need, NULL if none. */ > + bool has_lock; /* Has db server told us we have the lock? */ > + bool is_lock_contended; /* Has db server told us we can't get lock? > */ > + struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. > */ > + > + /* Last db txn id, used for fast resync through monitor_cond_since */ > + struct uuid last_id; > + > + /* Client interface. */ > + struct ovs_list events; > + const struct ovsdb_cs_ops *ops; > + void *ops_aux; > +}; > + > +static const struct ovsdb_cs_ops ovsdb_cs_server_ops; > + > +static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *); > +static unsigned int ovsdb_cs_db_set_condition( > + struct ovsdb_cs_db *, const char *db_name, const struct json *condition); > + > +static void ovsdb_cs_send_schema_request(struct ovsdb_cs *, > + struct ovsdb_cs_db *); > +static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *); > +static bool ovsdb_cs_check_server_db(struct ovsdb_cs *); > +static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *); > +static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *, > + struct ovsdb_cs_db *, int version); > +static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db); > +static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db); > + > +struct ovsdb_cs { > + struct ovsdb_cs_db server; > + struct ovsdb_cs_db data; > + > + /* Session state. > + * > + * 'state_seqno' is a snapshot of the session's sequence number as > returned > + * jsonrpc_session_get_seqno(session), so if it differs from the value > that > + * function currently returns then the session has reconnected and the > + * state machine must restart. */ > + struct jsonrpc_session *session; /* Connection to the server. */ > + char *remote; /* 'session' remote name. */ > + enum ovsdb_cs_state state; /* Current session state. */ > + unsigned int state_seqno; /* See above. */ > + struct json *request_id; /* JSON ID for request awaiting reply. > */ > + > + /* IDs of outstanding transactions. */ > + struct json **txns; > + size_t n_txns, allocated_txns; > + > + /* Info for the _Server database. */ > + struct uuid cid; > + struct hmap server_rows; > + > + /* Clustered servers. */ > + uint64_t min_index; /* Minimum allowed index, to avoid regression. > */ > + bool leader_only; /* If true, do not connect to Raft followers. */ > + bool shuffle_remotes; /* If true, connect to servers in random order. > */ > +}; > + > +static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state, > + const char *where); > +#define ovsdb_cs_transition(CS, STATE) \ > + ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR) > + > +static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where); > +#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR) > + > +static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5); > + > +static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *, > + const struct json *result, > + int version); > +static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *, > + const struct jsonrpc_msg *); > +static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *, > + struct ovsdb_cs_db *, > + const struct jsonrpc_msg *); > + > +static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *, > + const struct jsonrpc_msg *); > +static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request( > + struct ovsdb_cs_db *); > +static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request( > + struct ovsdb_cs_db *); > +static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *, > + const struct json *); > +static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *, > + const struct json *params, > + bool new_has_lock); > +static void ovsdb_cs_send_cond_change(struct ovsdb_cs *); > + > +static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *, > + const struct jsonrpc_msg *reply); > + > +/* Events. */ > + > +void > +ovsdb_cs_event_destroy(struct ovsdb_cs_event *event) > +{ > + if (event) { > + switch (event->type) { > + case OVSDB_CS_EVENT_TYPE_RECONNECT: > + case OVSDB_CS_EVENT_TYPE_LOCKED: > + break; > + > + case OVSDB_CS_EVENT_TYPE_UPDATE: > + json_destroy(event->update.table_updates); > + break; > + > + case OVSDB_CS_EVENT_TYPE_TXN_REPLY: > + jsonrpc_msg_destroy(event->txn_reply); > + break; > + } > + free(event); > + } > +} > + > +/* Lifecycle. */ > + > +static void > +ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name, > + struct ovsdb_cs *parent, int max_version, > + const struct ovsdb_cs_ops *ops, void *ops_aux) > +{ > + *db = (struct ovsdb_cs_db) { > + .cs = parent, > + .db_name = db_name, > + .tables = HMAP_INITIALIZER(&db->tables), > + .max_version = max_version, > + .monitor_id = json_array_create_2(json_string_create("monid"), > + json_string_create(db_name)), > + .events = OVS_LIST_INITIALIZER(&db->events), > + .ops = ops, > + .ops_aux = ops_aux, > + }; > +} > + > +/* Creates and returns a new client synchronization object. The connection > + * will monitor remote database 'db_name'. If 'retry' is true, then also > + * reconnect if the connection fails. > + * > + * XXX 'max_version' should ordinarily be 3, to allow use of the most > efficient > + * "monitor_cond_since" method with the database. Currently there's some > kind > + * of bug in the DDlog Rust code that interfaces to that, so instead > + * ovn-northd-ddlog passes 1 to use plain 'monitor' instead. Once the DDlog > + * Rust code gets fixed, we might as well just delete 'max_version' > + * entirely. > + * > + * 'ops' is a struct for northd_cs_run() to use, and 'ops_aux' is a pointer > + * that gets passed into each call. > + * > + * Use ovsdb_cs_set_remote() to configure the database to which to connect. > + * Until a remote is configured, no data can be retrieved. > + */ > +struct ovsdb_cs * > +ovsdb_cs_create(const char *db_name, int max_version, > + const struct ovsdb_cs_ops *ops, void *ops_aux) > +{ > + struct ovsdb_cs *cs = xzalloc(sizeof *cs); > + ovsdb_cs_db_init(&cs->server, "_Server", cs, 2, &ovsdb_cs_server_ops, > cs); > + ovsdb_cs_db_init(&cs->data, db_name, cs, max_version, ops, ops_aux); > + cs->state_seqno = UINT_MAX; > + cs->request_id = NULL; > + cs->leader_only = true; > + cs->shuffle_remotes = true; > + hmap_init(&cs->server_rows); > + > + return cs; > +} > + > +static void > +ovsdb_cs_db_destroy(struct ovsdb_cs_db *db) > +{ > + ovsdb_cs_db_destroy_tables(db); > + > + json_destroy(db->monitor_id); > + json_destroy(db->schema); > + > + free(db->lock_name); > + > + json_destroy(db->lock_request_id); > + > + /* This list always gets flushed out at the end of ovsdb_cs_run(). */ > + ovs_assert(ovs_list_is_empty(&db->events)); > +} > + > +/* Destroys 'cs' and all of the data structures that it manages. */ > +void > +ovsdb_cs_destroy(struct ovsdb_cs *cs) > +{ > + if (cs) { > + ovsdb_cs_db_destroy(&cs->server); > + ovsdb_cs_db_destroy(&cs->data); > + jsonrpc_session_close(cs->session); > + free(cs->remote); > + json_destroy(cs->request_id); > + > + for (size_t i = 0; i < cs->n_txns; i++) { > + json_destroy(cs->txns[i]); > + } > + free(cs->txns); > + > + ovsdb_cs_clear_server_rows(cs); > + hmap_destroy(&cs->server_rows); > + > + free(cs); > + } > +} > + > +static void > +ovsdb_cs_transition_at(struct ovsdb_cs *cs, enum ovsdb_cs_state new_state, > + const char *where) > +{ > + VLOG_DBG("%s: %s -> %s at %s", > + cs->session ? jsonrpc_session_get_name(cs->session) : "void", > + ovsdb_cs_state_to_string(cs->state), > + ovsdb_cs_state_to_string(new_state), > + where); > + cs->state = new_state; > +} > + > +static void > +ovsdb_cs_send_request(struct ovsdb_cs *cs, struct jsonrpc_msg *request) > +{ > + json_destroy(cs->request_id); > + cs->request_id = json_clone(request->id); > + if (cs->session) { > + jsonrpc_session_send(cs->session, request); > + } else { > + jsonrpc_msg_destroy(request); > + } > +} > + > +static void > +ovsdb_cs_retry_at(struct ovsdb_cs *cs, const char *where) > +{ > + ovsdb_cs_force_reconnect(cs); > + ovsdb_cs_transition_at(cs, CS_S_RETRY, where); > +} > + > +static void > +ovsdb_cs_restart_fsm(struct ovsdb_cs *cs) > +{ > + /* Resync data DB table conditions to avoid missing updates due to > + * conditions that were in flight or changed locally while the connection > + * was down. > + */ > + ovsdb_cs_db_sync_condition(&cs->data); > + > + ovsdb_cs_send_schema_request(cs, &cs->server); > + ovsdb_cs_transition(cs, CS_S_SERVER_SCHEMA_REQUESTED); > + cs->data.monitor_version = 0; > + cs->server.monitor_version = 0; > +} > + > +static void > +ovsdb_cs_process_response(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) > +{ > + bool ok = msg->type == JSONRPC_REPLY; > + if (!ok > + && cs->state != CS_S_SERVER_SCHEMA_REQUESTED > + && cs->state != CS_S_SERVER_MONITOR_REQUESTED > + && cs->state != CS_S_DATA_MONITOR_COND_REQUESTED > + && cs->state != CS_S_DATA_MONITOR_COND_SINCE_REQUESTED) { > + static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(5, 5); > + char *s = jsonrpc_msg_to_string(msg); > + VLOG_INFO_RL(&rl, "%s: received unexpected %s response in " > + "%s state: %s", jsonrpc_session_get_name(cs->session), > + jsonrpc_msg_type_to_string(msg->type), > + ovsdb_cs_state_to_string(cs->state), > + s); > + free(s); > + ovsdb_cs_retry(cs); > + return; > + } > + > + switch (cs->state) { > + case CS_S_SERVER_SCHEMA_REQUESTED: > + if (ok) { > + json_destroy(cs->server.schema); > + cs->server.schema = json_clone(msg->result); > + ovsdb_cs_send_monitor_request(cs, &cs->server, > + cs->server.max_version); > + ovsdb_cs_transition(cs, CS_S_SERVER_MONITOR_REQUESTED); > + } else { > + ovsdb_cs_send_schema_request(cs, &cs->data); > + ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); > + } > + break; > + > + case CS_S_SERVER_MONITOR_REQUESTED: > + if (ok) { > + cs->server.monitor_version = cs->server.max_version; > + ovsdb_cs_db_parse_monitor_reply(&cs->server, msg->result, > + cs->server.monitor_version); > + if (ovsdb_cs_check_server_db(cs)) { > + ovsdb_cs_send_db_change_aware(cs); > + } > + } else { > + ovsdb_cs_send_schema_request(cs, &cs->data); > + ovsdb_cs_transition(cs, CS_S_DATA_SCHEMA_REQUESTED); > + } > + break; > + > + case CS_S_DATA_SCHEMA_REQUESTED: > + json_destroy(cs->data.schema); > + cs->data.schema = json_clone(msg->result); > + if (cs->data.max_version >= 2) { > + ovsdb_cs_send_monitor_request(cs, &cs->data, 2); > + ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); > + } else { > + ovsdb_cs_send_monitor_request(cs, &cs->data, 1); > + ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); > + } > + break; > + > + case CS_S_DATA_MONITOR_COND_SINCE_REQUESTED: > + if (!ok) { > + /* "monitor_cond_since" not supported. Try "monitor_cond". */ > + ovsdb_cs_send_monitor_request(cs, &cs->data, 2); > + ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_COND_REQUESTED); > + } else { > + cs->data.monitor_version = 3; > + ovsdb_cs_transition(cs, CS_S_MONITORING); > + ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 3); > + } > + break; > + > + case CS_S_DATA_MONITOR_COND_REQUESTED: > + if (!ok) { > + /* "monitor_cond" not supported. Try "monitor". */ > + ovsdb_cs_send_monitor_request(cs, &cs->data, 1); > + ovsdb_cs_transition(cs, CS_S_DATA_MONITOR_REQUESTED); > + } else { > + cs->data.monitor_version = 2; > + ovsdb_cs_transition(cs, CS_S_MONITORING); > + ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 2); > + } > + break; > + > + case CS_S_DATA_MONITOR_REQUESTED: > + cs->data.monitor_version = 1; > + ovsdb_cs_transition(cs, CS_S_MONITORING); > + ovsdb_cs_db_parse_monitor_reply(&cs->data, msg->result, 1); > + break; > + > + case CS_S_MONITORING: > + /* We don't normally have a request outstanding in this state. If we > + * do, it's a "monitor_cond_change", which means that the conditional > + * monitor clauses were updated. > + * > + * Mark the last requested conditions as acked and if further > + * condition changes were pending, send them now. */ > + ovsdb_cs_db_ack_condition(&cs->data); > + ovsdb_cs_send_cond_change(cs); > + cs->data.cond_seqno++; > + break; > + > + case CS_S_ERROR: > + case CS_S_RETRY: > + /* Nothing to do in this state. */ > + break; > + > + default: > + OVS_NOT_REACHED(); > + } > +} > + > +static void > +ovsdb_cs_process_msg(struct ovsdb_cs *cs, struct jsonrpc_msg *msg) > +{ > + bool is_response = (msg->type == JSONRPC_REPLY || > + msg->type == JSONRPC_ERROR); > + > + /* Process a reply to an outstanding request. */ > + if (is_response > + && cs->request_id && json_equal(cs->request_id, msg->id)) { > + json_destroy(cs->request_id); > + cs->request_id = NULL; > + ovsdb_cs_process_response(cs, msg); > + return; > + } > + > + /* Process database contents updates. */ > + if (ovsdb_cs_db_parse_update_rpc(&cs->data, msg)) { > + return; > + } > + if (cs->server.monitor_version > + && ovsdb_cs_db_parse_update_rpc(&cs->server, msg)) { > + ovsdb_cs_check_server_db(cs); > + return; > + } > + > + if (ovsdb_cs_handle_monitor_canceled(cs, &cs->data, msg) > + || (cs->server.monitor_version > + && ovsdb_cs_handle_monitor_canceled(cs, &cs->server, msg))) { > + return; > + } > + > + /* Process "lock" replies and related notifications. */ > + if (ovsdb_cs_db_process_lock_replies(&cs->data, msg)) { > + return; > + } > + > + /* Process response to a database transaction we submitted. */ > + if (is_response && ovsdb_cs_db_txn_process_reply(cs, msg)) { > + return; > + } > + > + /* Unknown message. Log at a low level because this can happen if > + * ovsdb_cs_txn_destroy() is called to destroy a transaction > + * before we receive the reply. > + * > + * (We could sort those out from other kinds of unknown messages by > + * using distinctive IDs for transactions, if it seems valuable to > + * do so, and then it would be possible to use different log > + * levels. XXX?) */ > + char *s = jsonrpc_msg_to_string(msg); > + VLOG_DBG("%s: received unexpected %s message: %s", > + jsonrpc_session_get_name(cs->session), > + jsonrpc_msg_type_to_string(msg->type), s); > + free(s); > +} > + > +static struct ovsdb_cs_event * > +ovsdb_cs_db_add_event(struct ovsdb_cs_db *db, enum ovsdb_cs_event_type type) > +{ > + struct ovsdb_cs_event *event = xmalloc(sizeof *event); > + event->type = type; > + ovs_list_push_back(&db->events, &event->list_node); > + return event; > +} > + > +/* Processes a batch of messages from the database server on 'cs'. This may > + * cause the CS's contents to change. > + * > + * Initializes 'events' with a list of events that occurred on 'cs'. The > + * caller must process and destroy all of the events. */ > +void > +ovsdb_cs_run(struct ovsdb_cs *cs, struct ovs_list *events) > +{ > + ovs_list_init(events); > + if (!cs->session) { > + return; > + } > + > + ovsdb_cs_send_cond_change(cs); > + > + jsonrpc_session_run(cs->session); > + > + unsigned int seqno = jsonrpc_session_get_seqno(cs->session); > + if (cs->state_seqno != seqno) { > + cs->state_seqno = seqno; > + ovsdb_cs_restart_fsm(cs); > + > + for (size_t i = 0; i < cs->n_txns; i++) { > + json_destroy(cs->txns[i]); > + } > + cs->n_txns = 0; > + > + ovsdb_cs_db_add_event(&cs->data, OVSDB_CS_EVENT_TYPE_RECONNECT); > + > + if (cs->data.lock_name) { > + jsonrpc_session_send( > + cs->session, > + ovsdb_cs_db_compose_lock_request(&cs->data)); > + } > + } > + > + for (int i = 0; i < 50; i++) { > + struct jsonrpc_msg *msg = jsonrpc_session_recv(cs->session); > + if (!msg) { > + break; > + } > + ovsdb_cs_process_msg(cs, msg); > + jsonrpc_msg_destroy(msg); > + } > + ovs_list_push_back_all(events, &cs->data.events); > +} > + > +/* Arranges for poll_block() to wake up when ovsdb_cs_run() has something to > + * do or when activity occurs on a transaction on 'cs'. */ > +void > +ovsdb_cs_wait(struct ovsdb_cs *cs) > +{ > + if (!cs->session) { > + return; > + } > + jsonrpc_session_wait(cs->session); > + jsonrpc_session_recv_wait(cs->session); > +} > + > +/* Network connection. */ > + > +/* Changes the remote and creates a new session. > + * > + * If 'retry' is true, the connection to the remote will automatically retry > + * when it fails. If 'retry' is false, the connection is one-time. */ > +void > +ovsdb_cs_set_remote(struct ovsdb_cs *cs, const char *remote, bool retry) > +{ > + if (cs > + && ((remote != NULL) != (cs->remote != NULL) > + || (remote && cs->remote && strcmp(remote, cs->remote)))) { > + /* Close the old session, if any. */ > + if (cs->session) { > + jsonrpc_session_close(cs->session); > + cs->session = NULL; > + > + free(cs->remote); > + cs->remote = NULL; > + } > + > + /* Open new session, if any. */ > + if (remote) { > + struct svec remotes = SVEC_EMPTY_INITIALIZER; > + ovsdb_session_parse_remote(remote, &remotes, &cs->cid); > + if (cs->shuffle_remotes) { > + svec_shuffle(&remotes); > + } > + cs->session = jsonrpc_session_open_multiple(&remotes, retry); > + svec_destroy(&remotes); > + > + cs->state_seqno = UINT_MAX; > + > + cs->remote = xstrdup(remote); > + } > + } > +} > + > +/* Reconfigures 'cs' so that it would reconnect to the database, if > + * connection was dropped. */ > +void > +ovsdb_cs_enable_reconnect(struct ovsdb_cs *cs) > +{ > + if (cs->session) { > + jsonrpc_session_enable_reconnect(cs->session); > + } > +} > + > +/* Forces 'cs' to drop its connection to the database and reconnect. In the > + * meantime, the contents of 'cs' will not change. */ > +void > +ovsdb_cs_force_reconnect(struct ovsdb_cs *cs) > +{ > + if (cs->session) { > + jsonrpc_session_force_reconnect(cs->session); > + } > +} > + > +/* Drops 'cs''s current connection and the cached session. This is useful if > + * the client notices some kind of inconsistency. */ > +void > +ovsdb_cs_flag_inconsistency(struct ovsdb_cs *cs) > +{ > + cs->data.last_id = UUID_ZERO; > + ovsdb_cs_retry(cs); > +} > + > +/* Returns true if 'cs' is currently connected or will eventually try to > + * reconnect. */ > +bool > +ovsdb_cs_is_alive(const struct ovsdb_cs *cs) > +{ > + return (cs->session > + && jsonrpc_session_is_alive(cs->session) > + && cs->state != CS_S_ERROR); > +} > + > +/* Returns true if 'cs' is currently connected to a server. */ > +bool > +ovsdb_cs_is_connected(const struct ovsdb_cs *cs) > +{ > + return cs->session && jsonrpc_session_is_connected(cs->session); > +} > + > +/* Returns the last error reported on a connection by 'cs'. The return value > + * is 0 only if no connection made by 'cs' has ever encountered an error and > + * a negative response to a schema request has never been received. See > + * jsonrpc_get_status() for jsonrpc_session_get_last_error() return value > + * interpretation. */ > +int > +ovsdb_cs_get_last_error(const struct ovsdb_cs *cs) > +{ > + int err = cs->session ? jsonrpc_session_get_last_error(cs->session) : 0; > + if (err) { > + return err; > + } else if (cs->state == CS_S_ERROR) { > + return ENOENT; > + } else { > + return 0; > + } > +} > + > +/* Sets the "probe interval" for 'cs''s current session to 'probe_interval', > in > + * milliseconds. */ > +void > +ovsdb_cs_set_probe_interval(const struct ovsdb_cs *cs, int probe_interval) > +{ > + if (cs->session) { > + jsonrpc_session_set_probe_interval(cs->session, probe_interval); > + } > +} > + > +/* Conditional monitoring. */ > + > +/* A table being monitored. > + * > + * At the CS layer, the only thing we care about, table-wise, is the > conditions > + * we're using for monitoring them, so there's little here. We only create > + * these table structures at all for tables that have conditions. */ > +struct ovsdb_cs_db_table { > + struct hmap_node hmap_node; /* Indexed by 'name'. */ > + const char *name; /* Table name. */ > + > + /* Each of these is a null pointer if it is empty, or JSON [<condition>*] > + * or [true] or [false] otherwise. [true] could be represented as a null > + * pointer, but we want to distinguish "empty slot" from "a condition > that > + * is always true" in a slot. */ > + struct json *ack_cond; /* Last condition acked by the server. */ > + struct json *req_cond; /* Last condition requested to the server. */ > + struct json *new_cond; /* Latest condition set by the IDL client. */ > +}; > + > +/* A kind of condition, so that we can treat equivalent JSON as equivalent. > */ > +enum condition_type { > + COND_FALSE, /* [] or [false] */ > + COND_TRUE, /* Null pointer or [true] */ > + COND_OTHER /* Anything else. */ > +}; > + > +/* Returns the condition_type for 'condition'. */ > +static enum condition_type > +condition_classify(const struct json *condition) > +{ > + if (condition) { > + const struct json_array *a = json_array(condition); > + switch (a->n) { > + case 0: > + return COND_FALSE; > + > + case 1: > + return (a->elems[0]->type == JSON_FALSE ? COND_FALSE > + : a->elems[0]->type == JSON_TRUE ? COND_TRUE > + : COND_OTHER); > + > + default: > + return COND_OTHER; > + } > + } else { > + return COND_TRUE; > + } > +} > + > +/* Returns true if 'a' and 'b' are the same condition (in an obvious way; > we're > + * not going to compare for boolean equivalence or anything). */ > +static bool > +condition_equal(const struct json *a, const struct json *b) > +{ > + enum condition_type type = condition_classify(a); > + return (type == condition_classify(b) > + && (type != COND_OTHER || json_equal(a, b))); > +} > + > +/* Returns a clone of 'condition', translating always-true and always-false > to > + * [true] and [false], respectively. */ > +static struct json * > +condition_clone(const struct json *condition) > +{ > + switch (condition_classify(condition)) { > + case COND_TRUE: > + return json_array_create_1(json_boolean_create(true)); > + > + case COND_FALSE: > + return json_array_create_1(json_boolean_create(false)); > + > + case COND_OTHER: > + return json_clone(condition); > + } > + > + OVS_NOT_REACHED(); > +} > + > +/* Returns the ovsdb_cs_db_table associated with 'table' in 'db', creating an > + * empty one if necessary. */ > +static struct ovsdb_cs_db_table * > +ovsdb_cs_db_get_table(struct ovsdb_cs_db *db, const char *table) > +{ > + uint32_t hash = hash_string(table, 0); > + struct ovsdb_cs_db_table *t; > + > + HMAP_FOR_EACH_WITH_HASH (t, hmap_node, hash, &db->tables) { > + if (!strcmp(t->name, table)) { > + return t; > + } > + } > + > + t = xzalloc(sizeof *t); > + t->name = xstrdup(table); > + t->new_cond = json_array_create_1(json_boolean_create(true)); > + hmap_insert(&db->tables, &t->hmap_node, hash); > + return t; > +} > + > +static void > +ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *db) > +{ > + struct ovsdb_cs_db_table *table, *next; > + HMAP_FOR_EACH_SAFE (table, next, hmap_node, &db->tables) { > + json_destroy(table->ack_cond); > + json_destroy(table->req_cond); > + json_destroy(table->new_cond); We leak both 'table' and 'table->name' because we don't free them here. > + hmap_remove(&db->tables, &table->hmap_node); > + } > + hmap_destroy(&db->tables); > +} > + > +static unsigned int > +ovsdb_cs_db_set_condition(struct ovsdb_cs_db *db, const char *table, > + const struct json *condition) > +{ > + /* Compare the new condition to the last known condition which can be > + * either "new" (not sent yet), "requested" or "acked", in this order. */ > + struct ovsdb_cs_db_table *t = ovsdb_cs_db_get_table(db, table); > + const struct json *table_cond = (t->new_cond ? t->new_cond > + : t->req_cond ? t->req_cond > + : t->ack_cond); > + if (!condition_equal(condition, table_cond)) { > + json_destroy(t->new_cond); > + t->new_cond = condition_clone(condition); > + db->cond_changed = true; > + poll_immediate_wake(); > + } > + > + /* Conditions will be up to date when we receive replies for already > + * requested and new conditions, if any. */ > + return db->cond_seqno + (t->new_cond ? 1 : 0) + (t->req_cond ? 1 : 0); > +} > + > +/* Sets the replication condition for 'tc' in 'cs' to 'condition' and > arranges > + * to send the new condition to the database server. > + * > + * Return the next conditional update sequence number. When this value and > + * ovsdb_cs_get_condition_seqno() matches, 'cs' contains rows that match the > + * 'condition'. */ > +unsigned int > +ovsdb_cs_set_condition(struct ovsdb_cs *cs, const char *table, > + const struct json *condition) > +{ > + return ovsdb_cs_db_set_condition(&cs->data, table, condition); > +} > + > +/* Returns a "sequence number" that represents the number of conditional > + * monitoring updates successfully received by the OVSDB server of a CS > + * connection. > + * > + * ovsdb_cs_set_condition() sets a new condition that is different from the > + * current condtion, the next expected "sequence number" is returned. > + * > + * Whenever ovsdb_cs_get_condition_seqno() returns a value that matches the > + * return value of ovsdb_cs_set_condition(), the client is assured that: > + * > + * - The ovsdb_cs_set_condition() changes has been acknowledged by the > OVSDB > + * server. > + * > + * - 'cs' now contains the content matches the new conditions. */ > +unsigned int > +ovsdb_cs_get_condition_seqno(const struct ovsdb_cs *cs) > +{ > + return cs->data.cond_seqno; > +} > + > +static struct json * > +ovsdb_cs_create_cond_change_req(const struct json *cond) > +{ > + struct json *monitor_cond_change_request = json_object_create(); > + json_object_put(monitor_cond_change_request, "where", json_clone(cond)); > + return monitor_cond_change_request; > +} > + > +static struct jsonrpc_msg * > +ovsdb_cs_db_compose_cond_change(struct ovsdb_cs_db *db) > +{ > + if (!db->cond_changed) { > + return NULL; > + } > + > + struct json *monitor_cond_change_requests = NULL; > + struct ovsdb_cs_db_table *table; > + HMAP_FOR_EACH (table, hmap_node, &db->tables) { > + /* Always use the most recent conditions set by the CS client when > + * requesting monitor_cond_change, i.e., table->new_cond. > + */ > + if (table->new_cond) { > + struct json *req = > + ovsdb_cs_create_cond_change_req(table->new_cond); > + if (req) { > + if (!monitor_cond_change_requests) { > + monitor_cond_change_requests = json_object_create(); > + } > + json_object_put(monitor_cond_change_requests, > + table->name, > + json_array_create_1(req)); > + } > + /* Mark the new condition as requested by moving it to req_cond. > + * If there's already requested condition that's a bug. > + */ > + ovs_assert(table->req_cond == NULL); > + table->req_cond = table->new_cond; > + table->new_cond = NULL; > + } > + } > + > + if (!monitor_cond_change_requests) { > + return NULL; > + } > + > + db->cond_changed = false; > + struct json *params = json_array_create_3(json_clone(db->monitor_id), > + json_clone(db->monitor_id), > + monitor_cond_change_requests); > + return jsonrpc_create_request("monitor_cond_change", params, NULL); > +} > + > +/* Marks all requested table conditions in 'db' as acked by the server. > + * It should be called when the server replies to monitor_cond_change > + * requests. > + */ > +static void > +ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db) > +{ > + struct ovsdb_cs_db_table *table; > + HMAP_FOR_EACH (table, hmap_node, &db->tables) { > + if (table->req_cond) { > + json_destroy(table->ack_cond); > + table->ack_cond = table->req_cond; > + table->req_cond = NULL; > + } > + } > +} > + > +/* Should be called when the CS fsm is restarted and resyncs table conditions > + * based on the state the DB is in: > + * - if a non-zero last_id is available for the DB then upon reconnect > + * the CS should first request acked conditions to avoid missing updates > + * about records that were added before the transaction with > + * txn-id == last_id. If there were requested condition changes in flight > + * (i.e., req_cond not NULL) and the CS client didn't set new conditions > + * (i.e., new_cond is NULL) then move req_cond to new_cond to trigger a > + * follow up monitor_cond_change request. > + * - if there's no last_id available for the DB then it's safe to use the > + * latest conditions set by the CS client even if they weren't acked yet. > + */ > +static void > +ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db) > +{ > + bool ack_all = uuid_is_zero(&db->last_id); > + if (ack_all) { > + db->cond_changed = false; > + } > + > + struct ovsdb_cs_db_table *table; > + HMAP_FOR_EACH (table, hmap_node, &db->tables) { > + /* When monitor_cond_since requests will be issued, the > + * table->ack_cond condition will be added to the "where" clause". > + * Follow up monitor_cond_change requests will use table->new_cond. > + */ > + if (ack_all) { > + if (table->new_cond) { > + json_destroy(table->req_cond); > + table->req_cond = table->new_cond; > + table->new_cond = NULL; > + } > + > + if (table->req_cond) { > + json_destroy(table->ack_cond); > + table->ack_cond = table->req_cond; > + table->req_cond = NULL; > + } > + } else { > + /* If there was no "unsent" condition but instead a > + * monitor_cond_change request was in flight, move > table->req_cond > + * to table->new_cond and set db->cond_changed to trigger a new > + * monitor_cond_change request. > + * > + * However, if a new condition has been set by the CS client, > + * monitor_cond_change will be sent anyway and will use the most > + * recent table->new_cond so there's no need to update it here. > + */ > + if (table->req_cond) { > + if (table->new_cond) { > + json_destroy(table->req_cond); > + } else { > + table->new_cond = table->req_cond; > + } > + table->req_cond = NULL; > + db->cond_changed = true; > + } This used to be: if (table->req_cond && !table->new_cond) { /* Move "req_cond" to "new_cond". */ ovsdb_idl_condition_move(&table->new_cond, &table->req_cond); db->cond_changed = true; } Which is what the comment above tried to explain. Is there a case that was missed in the old code? If so, can we factor out the fix in a separate patch to make it easier to track? Thanks, Dumitru _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev