On 12/18/20 6:31 AM, Ben Pfaff wrote:
> This change breaks the IDL into two layers: the IDL proper, whose
> interface to its client is unchanged, and a low-level library called
> the OVSDB "client synchronization" (CS) library. There are two
> reasons for this change. First, the IDL is big and complicated and
> I think that this change factors out some of that complication into
> a simpler lower layer. Second, the OVN northd implementation based
> on DDlog can benefit from the client synchronization library even
> though it would actually be made increasingly complicated by the IDL.
>
> Signed-off-by: Ben Pfaff <[email protected]>
> ---
Hi Ben,
When running OVN tests with this OVS series applied, AddressSanitizer
reports a couple of leaks.
One is straightforward to fix, please see inline.
The other one I couldn't figure out:
==1273641==ERROR: LeakSanitizer: detected memory leaks
Direct leak of 40 byte(s) in 1 object(s) allocated from:
#0 0x7f65764f1667 in __interceptor_malloc (/lib64/libasan.so.6+0xb0667)
#1 0x5b623f in xmalloc lib/util.c:138
#2 0x6074ad in json_create lib/json.c:1451
#3 0x601dee in json_integer_create lib/json.c:263
#4 0x60ae64 in jsonrpc_create_id lib/jsonrpc.c:563
#5 0x60ae7f in jsonrpc_create_request lib/jsonrpc.c:570
#6 0x66fbd6 in ovsdb_cs_send_transaction lib/ovsdb-cs.c:1357
#7 0x575c6b in ovsdb_idl_txn_commit lib/ovsdb-idl.c:3147
#8 0x575f1e in ovsdb_idl_txn_commit_block lib/ovsdb-idl.c:3186
#9 0x4103d4 in do_sbctl utilities/ovn-sbctl.c:1688
#10 0x407e39 in main utilities/ovn-sbctl.c:152
#11 0x7f6575c53041 in __libc_start_main (/lib64/libc.so.6+0x27041
Regards,
Dumitru
> lib/ovsdb-cs.c | 1955 ++++++++++++++++++++++++++++++++++
> lib/ovsdb-cs.h | 141 ++-
> lib/ovsdb-idl-provider.h | 8 +-
> lib/ovsdb-idl.c | 2161 +++++++++-----------------------------
> 4 files changed, 2563 insertions(+), 1702 deletions(-)
>
> diff --git a/lib/ovsdb-cs.c b/lib/ovsdb-cs.c
> index f37aa5b04414..386d37b413f7 100644
> --- a/lib/ovsdb-cs.c
> +++ b/lib/ovsdb-cs.c
> @@ -39,6 +39,1947 @@
> #include "uuid.h"
>
> VLOG_DEFINE_THIS_MODULE(ovsdb_cs);
> +
> +/* Connection state machine.
> + *
> + * When a JSON-RPC session connects, the CS layer sends a "monitor_cond"
> + * request for the Database table in the _Server database and transitions to
> + * the CS_S_SERVER_MONITOR_REQUESTED state. If the session drops and
> + * reconnects, or if the CS receives a "monitor_canceled" notification for a
> + * table it is monitoring, the CS starts over again in the same way. */
> +#define OVSDB_CS_STATES \
> + /* Waits for "get_schema" reply, then sends "monitor_cond" \
> + * request for the Database table in the _Server database, whose \
> + * details are informed by the schema, and transitions to \
> + * CS_S_SERVER_MONITOR_REQUESTED. */ \
> + OVSDB_CS_STATE(SERVER_SCHEMA_REQUESTED) \
> + \
> + /* Waits for "monitor_cond" reply for the Database table: \
> + * \
> + * - If the reply indicates success, and the Database table has a \
> + * row for the CS database: \
> + * \
> + * * If the row indicates that this is a clustered database \
> + * that is not connected to the cluster, closes the \
> + * connection. The next connection attempt has a chance at \
> + * picking a connected server. \
> + * \
> + * * Otherwise, sends a monitoring request for the CS \
> + * database whose details are informed by the schema \
> + * (obtained from the row), and transitions to \
> + * CS_S_DATA_MONITOR_(COND_(SINCE_))REQUESTED. \
> + * \
> + * - If the reply indicates success, but the Database table does \
> + * not have a row for the CS database, transitions to \
> + * CS_S_ERROR. \
> + * \
> + * - If the reply indicates failure, sends a "get_schema" request \
> + * for the CS database and transitions to \
> + * CS_S_DATA_SCHEMA_REQUESTED. */ \
> + OVSDB_CS_STATE(SERVER_MONITOR_REQUESTED) \
> + \
> + /* Waits for "get_schema" reply, then sends "monitor_cond" \
> + * request whose details are informed by the schema, and \
> + * transitions to CS_S_DATA_MONITOR_COND_REQUESTED. */ \
> + OVSDB_CS_STATE(DATA_SCHEMA_REQUESTED) \
> + \
> + /* Waits for "monitor_cond_since" reply. If successful, replaces \
> + * the CS contents by the data carried in the reply and \
> + * transitions to CS_S_MONITORING. On failure, sends a \
> + * "monitor_cond" request and transitions to \
> + * CS_S_DATA_MONITOR_COND_REQUESTED. */ \
> + OVSDB_CS_STATE(DATA_MONITOR_COND_SINCE_REQUESTED) \
> + \
> + /* Waits for "monitor_cond" reply. If successful, replaces the \
> + * CS contents by the data carried in the reply and transitions \
> + * to CS_S_MONITORING. On failure, sends a "monitor" request \
> + * and transitions to CS_S_DATA_MONITOR_REQUESTED. */ \
> + OVSDB_CS_STATE(DATA_MONITOR_COND_REQUESTED) \
> + \
> + /* Waits for "monitor" reply. If successful, replaces the CS \
> + * contents by the data carried in the reply and transitions to \
> + * CS_S_MONITORING. On failure, transitions to CS_S_ERROR. */ \
> + OVSDB_CS_STATE(DATA_MONITOR_REQUESTED) \
> + \
> + /* State that processes "update", "update2" or "update3" \
> + * notifications for the main database (and the Database table \
> + * in _Server if available). \
> + * \
> + * If we're monitoring the Database table and we get notified \
> + * that the CS database has been deleted, we close the \
> + * connection (which will restart the state machine). */ \
> + OVSDB_CS_STATE(MONITORING) \
> + \
> + /* Terminal error state that indicates that nothing useful can be \
> + * done, for example because the database server doesn't actually \
> + * have the desired database. We maintain the session with the \
> + * database server anyway. If it starts serving the database \
> + * that we want, or if someone fixes and restarts the database, \
> + * then it will kill the session and we will automatically \
> + * reconnect and try again. */ \
> + OVSDB_CS_STATE(ERROR) \
> + \
> + /* Terminal state that indicates we connected to a useless server \
> + * in a cluster, e.g. one that is partitioned from the rest of \
> + * the cluster. We're waiting to retry. */ \
> + OVSDB_CS_STATE(RETRY)
> +
> +enum ovsdb_cs_state {
> +#define OVSDB_CS_STATE(NAME) CS_S_##NAME,
> + OVSDB_CS_STATES
> +#undef OVSDB_CS_STATE
> +};
> +
> +static const char *
> +ovsdb_cs_state_to_string(enum ovsdb_cs_state state)
> +{
> + switch (state) {
> +#define OVSDB_CS_STATE(NAME) case CS_S_##NAME: return #NAME;
> + OVSDB_CS_STATES
> +#undef OVSDB_CS_STATE
> + default: return "<unknown>";
> + }
> +}
> +
> +/* A database being monitored.
> + *
> + * There are two instances of this data structure for each CS instance, one
> for
> + * the _Server database used for working with clusters, and the other one for
> + * the actual database that the client is interested in. */
> +struct ovsdb_cs_db {
> + struct ovsdb_cs *cs;
> +
> + /* Data. */
> + const char *db_name; /* Database's name. */
> + struct hmap tables; /* Contains "struct ovsdb_cs_db_table *"s.*/
> + struct json *monitor_id;
> + struct json *schema;
> +
> + /* Monitor version. */
> + int max_version; /* Maximum version of monitor request to
> use. */
> + int monitor_version; /* 0 if not monitoring, 1=monitor,
> + * 2=monitor_cond, 3=monitor_cond_since. */
> +
> + /* Condition changes. */
> + bool cond_changed; /* Change not yet sent to server? */
> + unsigned int cond_seqno; /* Increments when condition changes. */
> +
> + /* Database locking. */
> + char *lock_name; /* Name of lock we need, NULL if none. */
> + bool has_lock; /* Has db server told us we have the lock? */
> + bool is_lock_contended; /* Has db server told us we can't get lock?
> */
> + struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request.
> */
> +
> + /* Last db txn id, used for fast resync through monitor_cond_since */
> + struct uuid last_id;
> +
> + /* Client interface. */
> + struct ovs_list events;
> + const struct ovsdb_cs_ops *ops;
> + void *ops_aux;
> +};
> +
> +static const struct ovsdb_cs_ops ovsdb_cs_server_ops;
> +
> +static void ovsdb_cs_db_destroy_tables(struct ovsdb_cs_db *);
> +static unsigned int ovsdb_cs_db_set_condition(
> + struct ovsdb_cs_db *, const char *db_name, const struct json *condition);
> +
> +static void ovsdb_cs_send_schema_request(struct ovsdb_cs *,
> + struct ovsdb_cs_db *);
> +static void ovsdb_cs_send_db_change_aware(struct ovsdb_cs *);
> +static bool ovsdb_cs_check_server_db(struct ovsdb_cs *);
> +static void ovsdb_cs_clear_server_rows(struct ovsdb_cs *);
> +static void ovsdb_cs_send_monitor_request(struct ovsdb_cs *,
> + struct ovsdb_cs_db *, int version);
> +static void ovsdb_cs_db_ack_condition(struct ovsdb_cs_db *db);
> +static void ovsdb_cs_db_sync_condition(struct ovsdb_cs_db *db);
> +
> +struct ovsdb_cs {
> + struct ovsdb_cs_db server;
> + struct ovsdb_cs_db data;
> +
> + /* Session state.
> + *
> + * 'state_seqno' is a snapshot of the session's sequence number as
> returned
> + * jsonrpc_session_get_seqno(session), so if it differs from the value
> that
> + * function currently returns then the session has reconnected and the
> + * state machine must restart. */
> + struct jsonrpc_session *session; /* Connection to the server. */
> + char *remote; /* 'session' remote name. */
> + enum ovsdb_cs_state state; /* Current session state. */
> + unsigned int state_seqno; /* See above. */
> + struct json *request_id; /* JSON ID for request awaiting reply.
> */
> +
> + /* IDs of outstanding transactions. */
> + struct json **txns;
> + size_t n_txns, allocated_txns;
> +
> + /* Info for the _Server database. */
> + struct uuid cid;
> + struct hmap server_rows;
> +
> + /* Clustered servers. */
> + uint64_t min_index; /* Minimum allowed index, to avoid regression.
> */
> + bool leader_only; /* If true, do not connect to Raft followers. */
> + bool shuffle_remotes; /* If true, connect to servers in random order.
> */
> +};
> +
> +static void ovsdb_cs_transition_at(struct ovsdb_cs *, enum ovsdb_cs_state,
> + const char *where);
> +#define ovsdb_cs_transition(CS, STATE) \
> + ovsdb_cs_transition_at(CS, STATE, OVS_SOURCE_LOCATOR)
> +
> +static void ovsdb_cs_retry_at(struct ovsdb_cs *, const char *where);
> +#define ovsdb_cs_retry(CS) ovsdb_cs_retry_at(CS, OVS_SOURCE_LOCATOR)
> +
> +static struct vlog_rate_limit syntax_rl = VLOG_RATE_LIMIT_INIT(1, 5);
> +
> +static void ovsdb_cs_db_parse_monitor_reply(struct ovsdb_cs_db *,
> + const struct json *result,
> + int version);
> +static bool ovsdb_cs_db_parse_update_rpc(struct ovsdb_cs_db *,
> + const struct jsonrpc_msg *);
> +static bool ovsdb_cs_handle_monitor_canceled(struct ovsdb_cs *,
> + struct ovsdb_cs_db *,
> + const struct jsonrpc_msg *);
> +
> +static bool ovsdb_cs_db_process_lock_replies(struct ovsdb_cs_db *,
> + const struct jsonrpc_msg *);
> +static struct jsonrpc_msg *ovsdb_cs_db_compose_lock_request(
> + struct ovsdb_cs_db *);
> +static struct jsonrpc_msg *ovsdb_cs_db_compose_unlock_request(
> + struct ovsdb_cs_db *);
> +static void ovsdb_cs_db_parse_lock_reply(struct ovsdb_cs_db *,
> + const struct json *);
> +static bool ovsdb_cs_db_parse_lock_notify(struct ovsdb_cs_db *,
> + const struct json *params,
> + bool new_has_lock);
> +static void ovsdb_cs_send_cond_change(struct ovsdb_cs *);
> +
> +static bool ovsdb_cs_db_txn_process_reply(struct ovsdb_cs *,
> + const struct jsonrpc_msg *reply);
> +
> +/* Events. */
> +
> +void
> +ovsdb_cs_event_destroy(struct ovsdb_cs_event *event)
> +{
> + if (event) {
> + switch (event->type) {
> + case OVSDB_CS_EVENT_TYPE_RECONNECT:
> + case OVSDB_CS_EVENT_TYPE_LOCKED:
> + break;
> +
> + case OVSDB_CS_EVENT_TYPE_UPDATE:
> + json_destroy(event->update.table_updates);
> + break;
> +
> + case OVSDB_CS_EVENT_TYPE_TXN_REPLY:
> + jsonrpc_msg_destroy(event->txn_reply);
> + break;
> + }
> + free(event);
> + }
> +}
> +
> +/* Lifecycle. */
> +
> +static void
> +ovsdb_cs_db_init(struct ovsdb_cs_db *db, const char *db_name,
> + struct ovsdb_cs *parent, int max_version,
> + const struct ovsdb_cs_ops *ops, void *ops_aux)
> +{
> + *db = (struct ovsdb_cs_db) {
> + .cs = parent,
> + .db_name = xstrdup(db_name),
'db_name' is not freed in ovsdb_cs_db_destroy().
[...]
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev