On 7/13/21 12:40 AM, Ilya Maximets wrote:
> On 6/25/21 3:34 PM, Dumitru Ceara wrote:
>> On 6/12/21 4:00 AM, Ilya Maximets wrote:
>>> New database service model 'relay' that is needed to scale out
>>> read-mostly database access, e.g. ovn-controller connections to
>>> OVN_Southbound.
>>>
>>> In this service model ovsdb-server connects to existing OVSDB
>>> server and maintains in-memory copy of the database. It serves
>>> read-only transactions and monitor requests by its own, but
>>> forwards write transactions to the relay source.
>>>
>>> Key differences from the active-backup replication:
>>> - support for "write" transactions (next commit).
>>> - no on-disk storage. (probably, faster operation)
>>> - support for multiple remotes (connect to the clustered db).
>>> - doesn't try to keep connection as long as possible, but
>>> faster reconnects to other remotes to avoid missing updates.
>>> - No need to know the complete database schema beforehand,
>>> only the schema name.
>>> - can be used along with other standalone and clustered databases
>>> by the same ovsdb-server process. (doesn't turn the whole
>>> jsonrpc server to read-only mode)
>>> - supports modern version of monitors (monitor_cond_since),
>>> because based on ovsdb-cs.
>>> - could be chained, i.e. multiple relays could be connected
>>> one to another in a row or in a tree-like form.
>>> - doesn't increase availability.
>>> - cannot be converted to other service models or become a main
>>> active server.
>>>
>>> Signed-off-by: Ilya Maximets <[email protected]>
>>> ---
>>
>> I have some very nitpicky comments below (and an unrelated bug report),
>> nevertheless:
>>
>> Acked-by: Dumitru Ceara <[email protected]>
>>
>>> ovsdb/_server.ovsschema | 7 +-
>>> ovsdb/_server.xml | 16 +-
>>> ovsdb/automake.mk | 2 +
>>> ovsdb/execution.c | 5 +
>>> ovsdb/ovsdb-server.c | 97 ++++++++----
>>> ovsdb/ovsdb.c | 2 +
>>> ovsdb/ovsdb.h | 3 +
>>> ovsdb/relay.c | 339 ++++++++++++++++++++++++++++++++++++++++
>>> ovsdb/relay.h | 34 ++++
>>> 9 files changed, 464 insertions(+), 41 deletions(-)
>>> create mode 100644 ovsdb/relay.c
>>> create mode 100644 ovsdb/relay.h
>>>
>>> diff --git a/ovsdb/_server.ovsschema b/ovsdb/_server.ovsschema
>>> index a867e5cbf..e3d9d893b 100644
>>> --- a/ovsdb/_server.ovsschema
>>> +++ b/ovsdb/_server.ovsschema
>>> @@ -1,13 +1,14 @@
>>> {"name": "_Server",
>>> - "version": "1.1.0",
>>> - "cksum": "3236486585 698",
>>> + "version": "1.2.0",
>>> + "cksum": "3009684573 744",
>>> "tables": {
>>> "Database": {
>>> "columns": {
>>> "name": {"type": "string"},
>>> "model": {
>>> "type": {"key": {"type": "string",
>>> - "enum": ["set", ["standalone", "clustered"]]}}},
>>> + "enum": ["set",
>>> + ["standalone", "clustered",
>>> "relay"]]}}},
>>> "connected": {"type": "boolean"},
>>> "leader": {"type": "boolean"},
>>> "schema": {
>>> diff --git a/ovsdb/_server.xml b/ovsdb/_server.xml
>>> index 70cd22db7..414be6715 100644
>>> --- a/ovsdb/_server.xml
>>> +++ b/ovsdb/_server.xml
>>> @@ -60,12 +60,14 @@
>>>
>>> <column name="model">
>>> The storage model: <code>standalone</code> for a standalone or
>>> - active-backup database, <code>clustered</code> for a clustered
>>> database.
>>> + active-backup database, <code>clustered</code> for a clustered
>>> database,
>>> + <code>relay</code> for a relay database.
>>> </column>
>>>
>>> <column name="schema">
>>> The database schema, as a JSON string. In the case of a clustered
>>> - database, this is empty until it finishes joining its cluster.
>>> + database, this is empty until it finishes joining its cluster. In
>>> the
>>> + case of a relay database - until it connects to the relay source.
>>> </column>
>>>
>>> <group title="Clustered Databases">
>>> @@ -85,20 +87,20 @@
>>>
>>> <column name="leader">
>>> True if the database is the leader in its cluster. For a
>>> standalone or
>>> - active-backup database, this is always true.
>>> + active-backup database, this is always true. Always false for
>>> relay.
>>> </column>
>>>
>>> <column name="cid">
>>> The cluster ID for this database, which is the same for all of the
>>> - servers that host this particular clustered database. For a
>>> standalone
>>> - or active-backup database, this is empty.
>>> + servers that host this particular clustered database. For a
>>> + standalone, active-backup or relay database, this is empty.
>>> </column>
>>>
>>> <column name="sid">
>>> The server ID for this database, different for each server that
>>> hosts a
>>> particular clustered database. A server that hosts more than one
>>> clustered database will have a different <code>sid</code> in each
>>> one.
>>> - For a standalone or active-backup database, this is empty.
>>> + For a standalone, active-backup or relay database, this is empty.
>>> </column>
>>>
>>> <column name="index">
>>> @@ -112,7 +114,7 @@
>>> </p>
>>>
>>> <p>
>>> - For a standalone or active-backup database, this is empty.
>>> + For a standalone, active-backup or relay database, this is empty.
>>> </p>
>>> </column>
>>> </group>
>>> diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk
>>> index 446d6c136..05c8ebbdf 100644
>>> --- a/ovsdb/automake.mk
>>> +++ b/ovsdb/automake.mk
>>> @@ -34,6 +34,8 @@ ovsdb_libovsdb_la_SOURCES = \
>>> ovsdb/rbac.h \
>>> ovsdb/replication.c \
>>> ovsdb/replication.h \
>>> + ovsdb/relay.c \
>>> + ovsdb/relay.h \
>>> ovsdb/row.c \
>>> ovsdb/row.h \
>>> ovsdb/server.c \
>>> diff --git a/ovsdb/execution.c b/ovsdb/execution.c
>>> index f6150e944..dd2569055 100644
>>> --- a/ovsdb/execution.c
>>> +++ b/ovsdb/execution.c
>>> @@ -196,6 +196,11 @@ ovsdb_execute_compose(struct ovsdb *db, const struct
>>> ovsdb_session *session,
>>> "%s operation not allowed on "
>>> "table in reserved database %s",
>>> op_name, db->schema->name);
>>> + } else if (db->is_relay) {
>>> + error = ovsdb_error("not allowed",
>>> + "%s operation not allowed when "
>>> + "database server is in relay mode",
>>> + op_name);
>>> }
>>> }
>>> if (error) {
>>> diff --git a/ovsdb/ovsdb-server.c b/ovsdb/ovsdb-server.c
>>> index 23bd226a3..77b1fbe40 100644
>>> --- a/ovsdb/ovsdb-server.c
>>> +++ b/ovsdb/ovsdb-server.c
>>> @@ -44,6 +44,7 @@
>>> #include "openvswitch/poll-loop.h"
>>> #include "process.h"
>>> #include "replication.h"
>>> +#include "relay.h"
>>> #include "row.h"
>>> #include "simap.h"
>>> #include "openvswitch/shash.h"
>>> @@ -225,6 +226,8 @@ main_loop(struct server_config *config,
>>> }
>>> }
>>>
>>> + ovsdb_relay_run();
>>> +
>>> struct shash_node *next;
>>> SHASH_FOR_EACH_SAFE (node, next, all_dbs) {
>>> struct db *db = node->data;
>>> @@ -273,6 +276,8 @@ main_loop(struct server_config *config,
>>> replication_wait();
>>> }
>>>
>>> + ovsdb_relay_wait();
>>> +
>>> ovsdb_jsonrpc_server_wait(jsonrpc);
>>> unixctl_server_wait(unixctl);
>>> SHASH_FOR_EACH(node, all_dbs) {
>>> @@ -546,6 +551,9 @@ close_db(struct server_config *config, struct db *db,
>>> char *comment)
>>> {
>>> if (db) {
>>> ovsdb_jsonrpc_server_remove_db(config->jsonrpc, db->db, comment);
>>> + if (db->db->is_relay) {
>>> + ovsdb_relay_del_db(db->db);
>>> + }
>>> ovsdb_destroy(db->db);
>>> free(db->filename);
>>> free(db);
>>> @@ -554,6 +562,28 @@ close_db(struct server_config *config, struct db *db,
>>> char *comment)
>>> }
>>> }
>>>
>>> +static void
>>> +update_schema(struct ovsdb *db, const struct ovsdb_schema *schema, void
>>> *aux)
>>> +{
>>> + struct server_config *config = aux;
>>> +
>>> + if (!db->schema || strcmp(schema->version, db->schema->version)) {
>>> + ovsdb_jsonrpc_server_reconnect(
>>> + config->jsonrpc, false,
>>> + (db->schema
>>> + ? xasprintf("database %s schema changed", db->name)
>>> + : xasprintf("database %s connected to storage", db->name)));
>>> + }
>>> +
>>> + ovsdb_replace(db, ovsdb_create(ovsdb_schema_clone(schema), NULL));
>>> +
>>> + /* Force update to schema in _Server database. */
>>> + struct db *dbp = shash_find_data(config->all_dbs, db->name);
>>> + if (dbp) {
>>> + dbp->row_uuid = UUID_ZERO;
>>> + }
>>> +}
>>> +
>>> static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>>> parse_txn(struct server_config *config, struct db *db,
>>> const struct ovsdb_schema *schema, const struct json *txn_json,
>>> @@ -575,21 +605,7 @@ parse_txn(struct server_config *config, struct db *db,
>>> if (error) {
>>> return error;
>>> }
>>> -
>>> - if (!db->db->schema ||
>>> - strcmp(schema->version, db->db->schema->version)) {
>>> - ovsdb_jsonrpc_server_reconnect(
>>> - config->jsonrpc, false,
>>> - (db->db->schema
>>> - ? xasprintf("database %s schema changed", db->db->name)
>>> - : xasprintf("database %s connected to storage",
>>> - db->db->name)));
>>> - }
>>> -
>>> - ovsdb_replace(db->db, ovsdb_create(ovsdb_schema_clone(schema),
>>> NULL));
>>> -
>>> - /* Force update to schema in _Server database. */
>>> - db->row_uuid = UUID_ZERO;
>>> + update_schema(db->db, schema, config);
>>> }
>>>
>>> if (txn_json) {
>>> @@ -660,27 +676,42 @@ add_db(struct server_config *config, struct db *db)
>>> static struct ovsdb_error * OVS_WARN_UNUSED_RESULT
>>> open_db(struct server_config *config, const char *filename)
>>> {
>>> + bool is_relay = !strncmp(filename, "relay:", 6);
>>> + const char *relay_remotes = NULL;
>>> + struct ovsdb_storage *storage;
>>> + struct ovsdb_error *error;
>>> struct db *db;
>>> + char *name;
>>> +
>>> + if (!is_relay) {
>>> + /* If we know that the file is already open, return a good error
>>> + * message. Otherwise, if the file is open, we'll fail later on
>>> with
>>> + * a harder to interpret file locking error. */
>>> + if (is_already_open(config, filename)) {
>>> + return ovsdb_error(NULL, "%s: already open", filename);
>>> + }
>>>
>>> - /* If we know that the file is already open, return a good error
>>> message.
>>> - * Otherwise, if the file is open, we'll fail later on with a harder to
>>> - * interpret file locking error. */
>>> - if (is_already_open(config, filename)) {
>>> - return ovsdb_error(NULL, "%s: already open", filename);
>>> - }
>>> + error = ovsdb_storage_open(filename, true, &storage);
>>> + if (error) {
>>> + return error;
>>> + }
>>> + name = xstrdup(filename);
>>> + } else {
>>> + /* Parsing the relay in format 'relay:DB_NAME:<list of remotes>'*/
>>> + relay_remotes = strchr(filename + 6, ':');
>>
>> Nit: s/6/strlen("relay:")/
>
> OK, but in this case we'll need a variable for the "relay:" string too.
> I'll do something with that.
>
>>
>>>
>>> - struct ovsdb_storage *storage;
>>> - struct ovsdb_error *error;
>>> - error = ovsdb_storage_open(filename, true, &storage);
>>> - if (error) {
>>> - return error;
>>> + if (!relay_remotes || relay_remotes[0] == '\0') {
>>> + return ovsdb_error(NULL, "%s: invalid syntax", filename);
>>> + }
>>> + name = xmemdup0(filename, relay_remotes - filename);
>>> + storage = ovsdb_storage_create_unbacked(name + 6);
>>
>> Same nit here.
>
> ok.
>
>>
>>> + relay_remotes++; /* Skip the ':'. */
>>> }
>>> -
>>> db = xzalloc(sizeof *db);
>>> - db->filename = xstrdup(filename);
>>> + db->filename = name;
>>>
>>> struct ovsdb_schema *schema;
>>> - if (ovsdb_storage_is_clustered(storage)) {
>>> + if (is_relay || ovsdb_storage_is_clustered(storage)) {
>>> schema = NULL;
>>> } else {
>>> struct json *txn_json;
>>
>> Not introduced by this patch but I noticed that in this "else" we might
>> return and leak 'db' and 'db->filename'.
>
> Yes, looks so. But this should be fixed by a separate patch and backported.
>
Will you be sending a patch or shall I?
>>
>>> @@ -716,6 +747,10 @@ open_db(struct server_config *config, const char
>>> *filename)
>>> }
>>>
>>> add_db(config, db);
>>> +
>>> + if (is_relay) {
>>> + ovsdb_relay_add_db(db->db, relay_remotes, update_schema, config);
>>> + }
>>> return NULL;
>>> }
>>>
>>> @@ -1153,11 +1188,11 @@ update_database_status(struct ovsdb_row *row,
>>> struct db *db)
>>> {
>>> ovsdb_util_write_string_column(row, "name", db->db->name);
>>> ovsdb_util_write_string_column(row, "model",
>>> -
>>> ovsdb_storage_get_model(db->db->storage));
>>> + db->db->is_relay ? "relay" :
>>> ovsdb_storage_get_model(db->db->storage));
>>> ovsdb_util_write_bool_column(row, "connected",
>>>
>>> ovsdb_storage_is_connected(db->db->storage));
>>> ovsdb_util_write_bool_column(row, "leader",
>>> - ovsdb_storage_is_leader(db->db->storage));
>>> + db->db->is_relay ? false :
>>> ovsdb_storage_is_leader(db->db->storage));
>>> ovsdb_util_write_uuid_column(row, "cid",
>>> ovsdb_storage_get_cid(db->db->storage));
>>> ovsdb_util_write_uuid_column(row, "sid",
>>> diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c
>>> index e019631e9..999cd0d75 100644
>>> --- a/ovsdb/ovsdb.c
>>> +++ b/ovsdb/ovsdb.c
>>> @@ -421,6 +421,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct
>>> ovsdb_storage *storage)
>>> ovs_list_init(&db->triggers);
>>> db->run_triggers_now = db->run_triggers = false;
>>>
>>> + db->is_relay = false;
>>> +
>>> shash_init(&db->tables);
>>> if (schema) {
>>> SHASH_FOR_EACH (node, &schema->tables) {
>>> diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h
>>> index 72e127c84..16bd5f5ec 100644
>>> --- a/ovsdb/ovsdb.h
>>> +++ b/ovsdb/ovsdb.h
>>> @@ -91,6 +91,9 @@ struct ovsdb {
>>> bool need_txn_history; /* Need to maintain history of
>>> transactions. */
>>> unsigned int n_txn_history; /* Current number of history transactions.
>>> */
>>> struct ovs_list txn_history; /* Contains "struct
>>> ovsdb_txn_history_node. */
>>> +
>>> + /* Relay mode. */
>>> + bool is_relay;
>>> };
>>>
>>> struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *);
>>> diff --git a/ovsdb/relay.c b/ovsdb/relay.c
>>> new file mode 100644
>>> index 000000000..5f423a0b9
>>> --- /dev/null
>>> +++ b/ovsdb/relay.c
>>> @@ -0,0 +1,339 @@
>>> +/*
>>> + * Copyright (c) 2021, Red Hat, Inc.
>>> + *
>>> + * Licensed under the Apache License, Version 2.0 (the "License");
>>> + * you may not use this file except in compliance with the License.
>>> + * You may obtain a copy of the License at:
>>> + *
>>> + * http://www.apache.org/licenses/LICENSE-2.0
>>> + *
>>> + * Unless required by applicable law or agreed to in writing, software
>>> + * distributed under the License is distributed on an "AS IS" BASIS,
>>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>>> + * See the License for the specific language governing permissions and
>>> + * limitations under the License.
>>> + */
>>> +
>>> +#include <config.h>
>>> +
>>> +#include "relay.h"
>>> +
>>> +#include "coverage.h"
>>> +#include "jsonrpc.h"
>>> +#include "openvswitch/hmap.h"
>>> +#include "openvswitch/json.h"
>>> +#include "openvswitch/list.h"
>>> +#include "openvswitch/poll-loop.h"
>>> +#include "openvswitch/shash.h"
>>> +#include "openvswitch/vlog.h"
>>> +#include "ovsdb.h"
>>> +#include "ovsdb-cs.h"
>>> +#include "ovsdb-error.h"
>>> +#include "row.h"
>>> +#include "table.h"
>>> +#include "transaction.h"
>>> +#include "util.h"
>>> +
>>> +VLOG_DEFINE_THIS_MODULE(relay);
>>> +
>>> +static struct shash relay_dbs = SHASH_INITIALIZER(&relay_dbs);
>>> +
>>> +struct relay_ctx {
>>> + struct ovsdb *db;
>>> + struct ovsdb_cs *cs;
>>> +
>>> + /* Schema updates. */
>>> + struct ovsdb_schema *new_schema;
>>> + schema_change_callback schema_change_cb;
>>> + void *schema_change_aux;
>>> +};
>>> +
>>> +static struct json *
>>> +ovsdb_relay_compose_monitor_request(const struct json *schema_json, void
>>> *ctx_)
>>> +{
>>> + struct json *monitor_request = json_object_create();
>>> + struct relay_ctx *ctx = ctx_;
>>> + struct ovsdb_schema *schema;
>>> + struct ovsdb *db = ctx->db;
>>> + struct ovsdb_error *error;
>>> +
>>> + error = ovsdb_schema_from_json(schema_json, &schema);
>>> + if (error) {
>>> + char *msg = ovsdb_error_to_string_free(error);
>>> + VLOG_WARN("%s: Failed to parse db schema: %s", db->name, msg);
>>> + free(msg);
>>> + /* There is nothing we can really do here. */
>>> + return monitor_request;
>>> + }
>>> +
>>> + const struct shash_node *node;
>>> + SHASH_FOR_EACH (node, &schema->tables) {
>>> + struct json *monitor_request_array = json_array_create_empty();
>>> + struct ovsdb_table_schema *table = node->data;
>>> +
>>> + json_array_add(monitor_request_array, json_object_create());
>>> + json_object_put(monitor_request, table->name,
>>> monitor_request_array);
>>> + }
>>> +
>>> + if (!db->schema || ovsdb_schema_equal(schema, db->schema)) {
>>> + VLOG_DBG("database %s schema changed.", db->name);
>>> + if (ctx->new_schema) {
>>> + ovsdb_schema_destroy(ctx->new_schema);
>>> + }
>>> + /* We will update the schema later when we will receive actual data
>>> + * from the mointor in order to avoid sitting with an empty
>>> database
>>> + * until the monitor reply. */
>>> + ctx->new_schema = schema;
>>> + } else {
>>> + ovsdb_schema_destroy(schema);
>>> + }
>>
>> This part feels a bit like a hack, doesn't have much to do with
>> composing the monitor request. OTOH, I'm not sure it's worth adding a
>> new ovsdb-cs op and callback to handle schema changes.
>
> I thought about that, but this callback already recieves the schema,
> and if we'll add another callbeck, there will be two callbacks that
> should be called one by one with same arguments. Not a pretty API
> either. So, I'd keep as is for now.
>
Sure, fine by me.
>>
>>> + return monitor_request;
>>> +}
>>> +
>>> +static struct ovsdb_cs_ops relay_cs_ops = {
>>> + .compose_monitor_requests = ovsdb_relay_compose_monitor_request,
>>> +};
>>> +
>>> +void
>>> +ovsdb_relay_add_db(struct ovsdb *db, const char *remote,
>>> + schema_change_callback schema_change_cb,
>>> + void *schema_change_aux)
>>> +{
>>> + struct relay_ctx *ctx;
>>> +
>>> + if (!db || !remote) {
>>> + return;
>>> + }
>>> +
>>> + ctx = shash_find_data(&relay_dbs, db->name);
>>> + if (ctx) {
>>> + ovsdb_cs_set_remote(ctx->cs, remote, true);
>>
>> We log when we add the database, would it also make sense to log the
>> remote update here?
>
> OK. This path is not used in current code, but we can log the
> change for the future.
>
>>
>>> + return;
>>> + }
>>> +
>>> + db->is_relay = true;
>>> + ctx = xzalloc(sizeof *ctx);
>>> + ctx->schema_change_cb = schema_change_cb;
>>> + ctx->schema_change_aux = schema_change_aux;
>>> + ctx->db = db;
>>> + ctx->cs = ovsdb_cs_create(db->name, 3, &relay_cs_ops, ctx);
>>> + shash_add(&relay_dbs, db->name, ctx);
>>> + ovsdb_cs_set_leader_only(ctx->cs, false);
>>> + ovsdb_cs_set_remote(ctx->cs, remote, true);
>>> +
>>> + VLOG_DBG("added database: %s, %s", db->name, remote);
>>> +}
>>> +
>>> +void
>>> +ovsdb_relay_del_db(struct ovsdb *db)
>>> +{
>>> + struct relay_ctx *ctx;
>>> +
>>> + if (!db) {
>>> + return;
>>> + }
>>> +
>>> + ctx = shash_find_and_delete(&relay_dbs, db->name);
>>> + if (!ctx) {
>>
>> Should we assert here? Or at least VLOG_WARN()?
>>
>
> OK. Warning is fine.
>
Thanks,
Dumitru
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev