On 14/07/2021 14:50, Ilya Maximets wrote: > Current version of ovsdb relay allows to scale out read-only > access to the primary database. However, many clients are not > read-only but read-mostly. For example, ovn-controller. > > In order to scale out database access for this case ovsdb-server > need to process transactions that are not read-only. Relay is not > allowed to do that, i.e. not allowed to modify the database, but it > can act like a proxy and forward transactions that includes database > modifications to the primary server and forward replies back to a > client. At the same time it may serve read-only transactions and > monitor requests by itself greatly reducing the load on primary > server. > > This configuration will slightly increase transaction latency, but > it's not very important for read-mostly use cases. > > Implementation details: > With this change instead of creating a trigger to commit the > transaction, ovsdb-server will create a trigger for transaction > forwarding. Later, ovsdb_relay_run() will send all new transactions > to the relay source. Once transaction reply received from the > relay source, ovsdb-relay module will update the state of the > transaction forwarding with the reply. After that, trigger_run() > will complete the trigger and jsonrpc_server_run() will send the > reply back to the client. Since transaction reply from the relay > source will be received after all the updates, client will receive > all the updates before receiving the transaction reply as it is in > a normal scenario with other database models. > > Acked-by: Dumitru Ceara <[email protected]> > Signed-off-by: Ilya Maximets <[email protected]> > --- > ovsdb/automake.mk | 2 + > ovsdb/execution.c | 18 ++-- > ovsdb/ovsdb.c | 9 ++ > ovsdb/ovsdb.h | 8 +- > ovsdb/relay.c | 12 ++- > ovsdb/transaction-forward.c | 182 ++++++++++++++++++++++++++++++++++++ > ovsdb/transaction-forward.h | 44 +++++++++ > ovsdb/trigger.c | 49 ++++++++-- > ovsdb/trigger.h | 41 ++++---- > tests/ovsdb-server.at | 85 ++++++++++++++++- > 10 files changed, 411 insertions(+), 39 deletions(-) > create mode 100644 ovsdb/transaction-forward.c > create mode 100644 ovsdb/transaction-forward.h > > diff --git a/ovsdb/automake.mk b/ovsdb/automake.mk > index 05c8ebbdf..62cc02686 100644 > --- a/ovsdb/automake.mk > +++ b/ovsdb/automake.mk > @@ -48,6 +48,8 @@ ovsdb_libovsdb_la_SOURCES = \ > ovsdb/trigger.h \ > ovsdb/transaction.c \ > ovsdb/transaction.h \ > + ovsdb/transaction-forward.c \ > + ovsdb/transaction-forward.h \ > ovsdb/ovsdb-util.c \ > ovsdb/ovsdb-util.h > ovsdb_libovsdb_la_CFLAGS = $(AM_CFLAGS) > diff --git a/ovsdb/execution.c b/ovsdb/execution.c > index dd2569055..f9b8067d0 100644 > --- a/ovsdb/execution.c > +++ b/ovsdb/execution.c > @@ -99,7 +99,8 @@ lookup_executor(const char *name, bool *read_only) > } > > /* On success, returns a transaction and stores the results to return to the > - * client in '*resultsp'. > + * client in '*resultsp'. If 'forwarding_needed' is nonnull and transaction > + * needs to be forwarded (in relay mode), sets '*forwarding_needed' to true. > * > * On failure, returns NULL. If '*resultsp' is nonnull, then it is the > results > * to return to the client. If '*resultsp' is null, then the execution > failed > @@ -111,7 +112,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct > ovsdb_session *session, > const struct json *params, bool read_only, > const char *role, const char *id, > long long int elapsed_msec, long long int > *timeout_msec, > - bool *durable, struct json **resultsp) > + bool *durable, bool *forwarding_needed, > + struct json **resultsp) > { > struct ovsdb_execution x; > struct ovsdb_error *error; > @@ -120,6 +122,9 @@ ovsdb_execute_compose(struct ovsdb *db, const struct > ovsdb_session *session, > size_t i; > > *durable = false; > + if (forwarding_needed) { > + *forwarding_needed = false; > + } > if (params->type != JSON_ARRAY > || !params->array.n > || params->array.elems[0]->type != JSON_STRING > @@ -196,11 +201,8 @@ ovsdb_execute_compose(struct ovsdb *db, const struct > ovsdb_session *session, > "%s operation not allowed on " > "table in reserved database %s", > op_name, db->schema->name); > - } else if (db->is_relay) { > - error = ovsdb_error("not allowed", > - "%s operation not allowed when " > - "database server is in relay mode", > - op_name); > + } else if (db->is_relay && forwarding_needed) { > + *forwarding_needed = true; > } > } > if (error) { > @@ -245,7 +247,7 @@ ovsdb_execute(struct ovsdb *db, const struct > ovsdb_session *session, > struct json *results; > struct ovsdb_txn *txn = ovsdb_execute_compose( > db, session, params, read_only, role, id, elapsed_msec, timeout_msec, > - &durable, &results); > + &durable, NULL, &results); > if (!txn) { > return results; > } > diff --git a/ovsdb/ovsdb.c b/ovsdb/ovsdb.c > index 999cd0d75..126d16a2f 100644 > --- a/ovsdb/ovsdb.c > +++ b/ovsdb/ovsdb.c > @@ -33,6 +33,7 @@ > #include "table.h" > #include "timeval.h" > #include "transaction.h" > +#include "transaction-forward.h" > #include "trigger.h" > > #include "openvswitch/vlog.h" > @@ -422,6 +423,8 @@ ovsdb_create(struct ovsdb_schema *schema, struct > ovsdb_storage *storage) > db->run_triggers_now = db->run_triggers = false; > > db->is_relay = false; > + ovs_list_init(&db->txn_forward_new); > + hmap_init(&db->txn_forward_sent); > > shash_init(&db->tables); > if (schema) { > @@ -465,6 +468,12 @@ ovsdb_destroy(struct ovsdb *db) > /* Destroy txn history. */ > ovsdb_txn_history_destroy(db); > > + /* Cancell all the forwarded transactions. There should not be > + * any as all triggers should be already cancelled. */ > + ovsdb_txn_forward_cancel_all(db, false); > + ovs_assert(hmap_is_empty(&db->txn_forward_sent)); > + hmap_destroy(&db->txn_forward_sent); > + > /* The caller must ensure that no triggers remain. */ > ovs_assert(ovs_list_is_empty(&db->triggers)); > > diff --git a/ovsdb/ovsdb.h b/ovsdb/ovsdb.h > index 16bd5f5ec..4a7bd0f0e 100644 > --- a/ovsdb/ovsdb.h > +++ b/ovsdb/ovsdb.h > @@ -93,7 +93,11 @@ struct ovsdb { > struct ovs_list txn_history; /* Contains "struct ovsdb_txn_history_node. > */ > > /* Relay mode. */ > - bool is_relay; > + bool is_relay; /* True, if database is in relay mode. */ > + /* List that holds transactions waiting to be forwarded to the server. */ > + struct ovs_list txn_forward_new; > + /* Hash map for transactions that are already sent and waits for reply. > */ > + struct hmap txn_forward_sent; > }; > > struct ovsdb *ovsdb_create(struct ovsdb_schema *, struct ovsdb_storage *); > @@ -107,7 +111,7 @@ struct ovsdb_txn *ovsdb_execute_compose( > struct ovsdb *, const struct ovsdb_session *, const struct json *params, > bool read_only, const char *role, const char *id, > long long int elapsed_msec, long long int *timeout_msec, > - bool *durable, struct json **); > + bool *durable, bool *forwarding_needed, struct json **); > > struct json *ovsdb_execute(struct ovsdb *, const struct ovsdb_session *, > const struct json *params, bool read_only, > diff --git a/ovsdb/relay.c b/ovsdb/relay.c > index 740b34ddf..df9906bda 100644 > --- a/ovsdb/relay.c > +++ b/ovsdb/relay.c > @@ -32,6 +32,7 @@ > #include "row.h" > #include "table.h" > #include "transaction.h" > +#include "transaction-forward.h" > #include "util.h" > > VLOG_DEFINE_THIS_MODULE(relay); > @@ -302,6 +303,7 @@ ovsdb_relay_run(void) > struct relay_ctx *ctx = node->data; > struct ovs_list events; > > + ovsdb_txn_forward_run(ctx->db, ctx->cs); > ovsdb_cs_run(ctx->cs, &events); > > struct ovsdb_cs_event *event; > @@ -313,7 +315,9 @@ ovsdb_relay_run(void) > > switch (event->type) { > case OVSDB_CS_EVENT_TYPE_RECONNECT: > - /* Nothing to do. */ > + /* Cancelling all the transactions that were already sent but > + * not replied yet as they might be lost. */ > + ovsdb_txn_forward_cancel_all(ctx->db, true); > break; > > case OVSDB_CS_EVENT_TYPE_UPDATE: > @@ -321,8 +325,11 @@ ovsdb_relay_run(void) > break; > > case OVSDB_CS_EVENT_TYPE_TXN_REPLY: > + ovsdb_txn_forward_complete(ctx->db, event->txn_reply); > + break; > + > case OVSDB_CS_EVENT_TYPE_LOCKED: > - /* Not expected. */ > + VLOG_WARN("%s: Unexpected LOCKED event.", ctx->db->name); > break; > } > ovsdb_cs_event_destroy(event); > @@ -339,5 +346,6 @@ ovsdb_relay_wait(void) > struct relay_ctx *ctx = node->data; > > ovsdb_cs_wait(ctx->cs); > + ovsdb_txn_forward_wait(ctx->db, ctx->cs); > } > } > diff --git a/ovsdb/transaction-forward.c b/ovsdb/transaction-forward.c > new file mode 100644 > index 000000000..8ff12ef4b > --- /dev/null > +++ b/ovsdb/transaction-forward.c > @@ -0,0 +1,182 @@ > +/* > + * Copyright (c) 2021, Red Hat, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include <config.h> > + > +#include "transaction-forward.h" > + > +#include "coverage.h" > +#include "jsonrpc.h" > +#include "openvswitch/hmap.h" > +#include "openvswitch/json.h" > +#include "openvswitch/list.h" > +#include "openvswitch/poll-loop.h" > +#include "openvswitch/vlog.h" > +#include "ovsdb.h" > +#include "ovsdb-cs.h" > +#include "util.h" > + > +VLOG_DEFINE_THIS_MODULE(transaction_forward); > + > +COVERAGE_DEFINE(txn_forward_cancel); > +COVERAGE_DEFINE(txn_forward_complete); > +COVERAGE_DEFINE(txn_forward_create); > +COVERAGE_DEFINE(txn_forward_sent); > + > +struct ovsdb_txn_forward { > + struct ovs_list new_node; /* In 'txn_forward_new' of struct ovsdb. */ > + struct hmap_node sent_node; /* In 'txn_forward_sent' of struct ovsdb. */ > + struct json *id; /* 'id' of the forwarded transaction. */ > + struct jsonrpc_msg *request; /* Original request. */ > + struct jsonrpc_msg *reply; /* Reply from the server. */ > +}; > + > +struct ovsdb_txn_forward * > +ovsdb_txn_forward_create(struct ovsdb *db, const struct jsonrpc_msg *request) > +{ > + struct ovsdb_txn_forward *txn_fwd = xzalloc(sizeof *txn_fwd); > + > + COVERAGE_INC(txn_forward_create); > + txn_fwd->request = jsonrpc_msg_clone(request); > + ovs_list_push_back(&db->txn_forward_new, &txn_fwd->new_node); > + > + return txn_fwd; > +} > + > +static void > +ovsdb_txn_forward_unlist(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd) > +{ > + if (!ovs_list_is_empty(&txn_fwd->new_node)) { > + ovs_list_remove(&txn_fwd->new_node); > + ovs_list_init(&txn_fwd->new_node); > + } > + if (!hmap_node_is_null(&txn_fwd->sent_node)) { > + hmap_remove(&db->txn_forward_sent, &txn_fwd->sent_node); > + hmap_node_nullify(&txn_fwd->sent_node); > + } > +} > + > +void > +ovsdb_txn_forward_destroy(struct ovsdb *db, struct ovsdb_txn_forward > *txn_fwd) > +{ > + if (!txn_fwd) { > + return; > + } > + > + ovsdb_txn_forward_unlist(db, txn_fwd); > + json_destroy(txn_fwd->id); > + jsonrpc_msg_destroy(txn_fwd->request); > + jsonrpc_msg_destroy(txn_fwd->reply); > + free(txn_fwd); > +} > + > +bool > +ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *txn_fwd) > +{ > + return txn_fwd->reply != NULL; > +} > + > +void > +ovsdb_txn_forward_complete(struct ovsdb *db, const struct jsonrpc_msg *reply) > +{ > + struct ovsdb_txn_forward *t; > + size_t hash = json_hash(reply->id, 0); > + > + HMAP_FOR_EACH_WITH_HASH (t, sent_node, hash, &db->txn_forward_sent) { > + if (json_equal(reply->id, t->id)) { > + COVERAGE_INC(txn_forward_complete); > + t->reply = jsonrpc_msg_clone(reply); > + > + /* Replacing id with the id of the original request. */ > + json_destroy(t->reply->id); > + t->reply->id = json_clone(t->request->id); > + > + hmap_remove(&db->txn_forward_sent, &t->sent_node); > + hmap_node_nullify(&t->sent_node); > + > + db->run_triggers_now = db->run_triggers = true; > + return; > + } > + } > +} > + > +struct jsonrpc_msg * > +ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward *txn_fwd) > +{ > + struct jsonrpc_msg *reply = txn_fwd->reply; > + > + txn_fwd->reply = NULL; > + return reply; > +} > + > +void > +ovsdb_txn_forward_run(struct ovsdb *db, struct ovsdb_cs *cs) > +{ > + struct ovsdb_txn_forward *t, *next; > + > + /* Send all transactions that needs to be forwarded. */ > + LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) { > + if (!ovsdb_cs_may_send_transaction(cs)) { > + break; > + } > + ovs_assert(!strcmp(t->request->method, "transact")); > + t->id = ovsdb_cs_send_transaction(cs, > json_clone(t->request->params)); > + if (t->id) { > + COVERAGE_INC(txn_forward_sent); > + ovs_list_remove(&t->new_node); > + ovs_list_init(&t->new_node); > + hmap_insert(&db->txn_forward_sent, &t->sent_node, > + json_hash(t->id, 0)); > + } > + } > +} > + > +void > +ovsdb_txn_forward_wait(struct ovsdb *db, struct ovsdb_cs *cs) > +{ > + if (ovsdb_cs_may_send_transaction(cs) > + && !ovs_list_is_empty(&db->txn_forward_new)) { > + poll_immediate_wake(); > + } > +} > + > +void > +ovsdb_txn_forward_cancel(struct ovsdb *db, struct ovsdb_txn_forward *txn_fwd) > +{ > + COVERAGE_INC(txn_forward_cancel); > + jsonrpc_msg_destroy(txn_fwd->reply); > + txn_fwd->reply = jsonrpc_create_error(json_string_create("canceled"), > + txn_fwd->request->id); > + ovsdb_txn_forward_unlist(db, txn_fwd); > +} > + > +void > +ovsdb_txn_forward_cancel_all(struct ovsdb *db, bool sent_only) > +{ > + struct ovsdb_txn_forward *t, *next; > + > + HMAP_FOR_EACH_SAFE (t, next, sent_node, &db->txn_forward_sent) { > + ovsdb_txn_forward_cancel(db, t); > + } > + > + if (sent_only) { > + return; > + } > + > + LIST_FOR_EACH_SAFE (t, next, new_node, &db->txn_forward_new) { > + ovsdb_txn_forward_cancel(db, t); > + } > +} > diff --git a/ovsdb/transaction-forward.h b/ovsdb/transaction-forward.h > new file mode 100644 > index 000000000..6788d3824 > --- /dev/null > +++ b/ovsdb/transaction-forward.h > @@ -0,0 +1,44 @@ > +/* > + * Copyright (c) 2021, Red Hat, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#ifndef OVSDB_TXN_FORWARD_H > +#define OVSDB_TXN_FORWARD_H 1 > + > +#include <stdbool.h> > + > +struct ovsdb; > +struct ovsdb_cs; > +struct ovsdb_txn_forward; > +struct jsonrpc_session; > +struct jsonrpc_msg; > + > +struct ovsdb_txn_forward *ovsdb_txn_forward_create( > + struct ovsdb *, const struct jsonrpc_msg *request); > +void ovsdb_txn_forward_destroy(struct ovsdb *, struct ovsdb_txn_forward *); > + > +bool ovsdb_txn_forward_is_complete(const struct ovsdb_txn_forward *); > +void ovsdb_txn_forward_complete(struct ovsdb *, > + const struct jsonrpc_msg *reply); > + > +struct jsonrpc_msg *ovsdb_txn_forward_steal_reply(struct ovsdb_txn_forward > *); > + > +void ovsdb_txn_forward_run(struct ovsdb *, struct ovsdb_cs *); > +void ovsdb_txn_forward_wait(struct ovsdb *, struct ovsdb_cs *); > + > +void ovsdb_txn_forward_cancel(struct ovsdb *, struct ovsdb_txn_forward *); > +void ovsdb_txn_forward_cancel_all(struct ovsdb *, bool sent_only); > + > +#endif /* OVSDB_TXN_FORWARD_H */ > diff --git a/ovsdb/trigger.c b/ovsdb/trigger.c > index 0372302af..726c138bf 100644 > --- a/ovsdb/trigger.c > +++ b/ovsdb/trigger.c > @@ -28,6 +28,7 @@ > #include "openvswitch/poll-loop.h" > #include "server.h" > #include "transaction.h" > +#include "transaction-forward.h" > #include "openvswitch/vlog.h" > #include "util.h" > > @@ -53,6 +54,7 @@ ovsdb_trigger_init(struct ovsdb_session *session, struct > ovsdb *db, > trigger->request = request; > trigger->reply = NULL; > trigger->progress = NULL; > + trigger->txn_forward = NULL; > trigger->created = now; > trigger->timeout_msec = LLONG_MAX; > trigger->read_only = read_only; > @@ -65,6 +67,7 @@ void > ovsdb_trigger_destroy(struct ovsdb_trigger *trigger) > { > ovsdb_txn_progress_destroy(trigger->progress); > + ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward); > ovs_list_remove(&trigger->node); > jsonrpc_msg_destroy(trigger->request); > jsonrpc_msg_destroy(trigger->reply); > @@ -75,7 +78,7 @@ ovsdb_trigger_destroy(struct ovsdb_trigger *trigger) > bool > ovsdb_trigger_is_complete(const struct ovsdb_trigger *trigger) > { > - return trigger->reply && !trigger->progress; > + return trigger->reply && !trigger->progress && !trigger->txn_forward; > } > > struct jsonrpc_msg * > @@ -98,6 +101,11 @@ ovsdb_trigger_cancel(struct ovsdb_trigger *trigger, const > char *reason) > trigger->progress = NULL; > } > > + if (trigger->txn_forward) { > + ovsdb_txn_forward_destroy(trigger->db, trigger->txn_forward); > + trigger->txn_forward = NULL; > + } > + > jsonrpc_msg_destroy(trigger->reply); > trigger->reply = NULL; > > @@ -148,7 +156,7 @@ ovsdb_trigger_run(struct ovsdb *db, long long int now) > LIST_FOR_EACH_SAFE (t, next, node, &db->triggers) { > if (run_triggers > || now - t->created >= t->timeout_msec > - || t->progress) { > + || t->progress || t->txn_forward) { > if (ovsdb_trigger_try(t, now)) { > disconnect_all = true; > } > @@ -188,7 +196,7 @@ static bool > ovsdb_trigger_try(struct ovsdb_trigger *t, long long int now) > { > /* Handle "initialized" state. */ > - if (!t->reply) { > + if (!t->reply && !t->txn_forward) { > ovs_assert(!t->progress); > > struct ovsdb_txn *txn = NULL; > @@ -198,13 +206,14 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long > int now) > return false; > } > > - bool durable; > + bool durable, forwarding_needed; > > struct json *result; > + /* Trying to compose transaction. */ > txn = ovsdb_execute_compose( > t->db, t->session, t->request->params, t->read_only, > t->role, t->id, now - t->created, &t->timeout_msec, > - &durable, &result); > + &durable, &forwarding_needed, &result); > if (!txn) { > if (result) { > /* Complete. There was an error but we still represent > it > @@ -217,9 +226,20 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int > now) > return false; > } > > - /* Transition to "committing" state. */ > - t->reply = jsonrpc_create_reply(result, t->request->id); > - t->progress = ovsdb_txn_propose_commit(txn, durable); > + if (forwarding_needed) { > + /* Transaction is good, but we don't need it. */ > + ovsdb_txn_abort(txn); > + json_destroy(result); > + /* Transition to "forwarding" state. */ > + t->txn_forward = ovsdb_txn_forward_create(t->db, t->request); > + /* Forward will not be completed immediately. Will check > + * next time. */ > + return false; > + } else { > + /* Transition to "committing" state. */ > + t->reply = jsonrpc_create_reply(result, t->request->id); > + t->progress = ovsdb_txn_propose_commit(txn, durable); > + } > } else if (!strcmp(t->request->method, "convert")) { > /* Permission check. */ > if (t->role && *t->role) { > @@ -348,6 +368,19 @@ ovsdb_trigger_try(struct ovsdb_trigger *t, long long int > now) > ovsdb_trigger_complete(t); > } > > + return false; > + } else if (t->txn_forward) { > + /* Handle "forwarding" state. */ > + if (!ovsdb_txn_forward_is_complete(t->txn_forward)) { > + return false; > + } > + > + /* Transition to "complete". */ > + ovs_assert(!t->reply); > + t->reply = ovsdb_txn_forward_steal_reply(t->txn_forward); > + ovsdb_txn_forward_destroy(t->db, t->txn_forward); > + t->txn_forward = NULL; > + ovsdb_trigger_complete(t); > return false; > } > > diff --git a/ovsdb/trigger.h b/ovsdb/trigger.h > index 79af7f6be..d060c72e5 100644 > --- a/ovsdb/trigger.h > +++ b/ovsdb/trigger.h > @@ -22,26 +22,34 @@ struct ovsdb; > > /* Triggers have the following states: > * > - * - Initialized (reply == NULL, progress == NULL): Executing the trigger > - * can keep it in the initialized state, if it has a "wait" condition > that > - * isn't met. Executing the trigger can also yield an error, in which > - * case it transitions to "complete". Otherwise, execution yields a > - * transaction, which the database attempts to commit. If the > transaction > - * completes immediately and synchronously, then the trigger transitions > - * to the "complete" state. If the transaction requires some time to > - * complete, it transitions to the "committing" state. > + * - Initialized (reply == NULL, progress == NULL, txn_forward == NULL): > + * Executing the trigger can keep it in the initialized state, if it > has a > + * "wait" condition that isn't met. Executing the trigger can also > yield > + * an error, in which case it transitions to "complete". Otherwise, > + * execution yields a transaction, which the database attempts to > commit. > + * If the transaction completes immediately and synchronously, then the > + * trigger transitions to the "complete" state. If the transaction > + * requires some time to complete, it transitions to the "committing" > + * state. If the transaction can not be completed locally due to > + * read-only restrictions and transaction forwarding is enabled, starts > + * forwarding and transitions to the "forwarding" state. > * > - * - Committing (reply != NULL, progress != NULL): The transaction is > - * committing. If it succeeds, or if it fails permanently, then the > - * trigger transitions to "complete". If it fails temporarily > - * (e.g. because someone else committed to cluster-based storage before > we > - * did), then we transition back to "initialized" to try again. > + * - Committing (reply != NULL, progress != NULL, txn_forward == NULL): > + * The transaction is committing. If it succeeds, or if it fails > + * permanently, then the trigger transitions to "complete". If it fails > + * temporarily (e.g. because someone else committed to cluster-based > + * storage before we did), then we transition back to "initialized" to > + * try again. > * > - * - Complete (reply != NULL, progress == NULL): The transaction is done > - * and either succeeded or failed. > + * - Forwarding (reply == NULL, progress == NULL, txn_forward != NULL): > + * Transaction is forwarded. Either it succeeds or it fails, the > trigger > + * transitions to "complete". > + * > + * - Complete (reply != NULL, progress == NULL, txn_forward == NULL): > + * The transaction is done and either succeeded or failed. > */ > struct ovsdb_trigger { > - /* In "initialized" or "committing" state, in db->triggers. > + /* In "initialized", "committing" or "forwarding" state, in db->triggers. > * In "complete", in session->completions. */ > struct ovs_list node; > struct ovsdb_session *session; /* Session that owns this trigger. */ > @@ -49,6 +57,7 @@ struct ovsdb_trigger { > struct jsonrpc_msg *request; /* Database request. */ > struct jsonrpc_msg *reply; /* Result (null if none yet). */ > struct ovsdb_txn_progress *progress; > + struct ovsdb_txn_forward *txn_forward; /* Tracks transaction forwarding. > */ > long long int created; /* Time created. */ > long long int timeout_msec; /* Max wait duration. */ > bool read_only; /* Database is in read only mode. */ > diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at > index ba1b369c1..ac243d6a7 100644 > --- a/tests/ovsdb-server.at > +++ b/tests/ovsdb-server.at > @@ -3,10 +3,13 @@ AT_BANNER([OVSDB -- ovsdb-server transactions (Unix > sockets)]) > m4_define([OVSDB_SERVER_SHUTDOWN], > [OVS_APP_EXIT_AND_WAIT_BY_TARGET([ovsdb-server], [ovsdb-server.pid])]) > > +m4_define([OVSDB_SERVER_SHUTDOWN_N], > + [cp pid$1 savepid$1 > + AT_CHECK([ovs-appctl -t "`pwd`"/unixctl$1 -e exit], [0], [ignore], > [ignore]) > + OVS_WAIT_WHILE([kill -0 `cat savepid$1`], [kill `cat savepid$1`])]) > + > m4_define([OVSDB_SERVER_SHUTDOWN2], > - [cp pid2 savepid2 > - AT_CHECK([ovs-appctl -t "`pwd`"/unixctl2 -e exit], [0], [ignore], > [ignore]) > - OVS_WAIT_WHILE([kill -0 `cat savepid2`], [kill `cat savepid2`])]) > + [OVSDB_SERVER_SHUTDOWN_N([2])]) > > # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) > # > @@ -1412,6 +1415,82 @@ m4_define([OVSDB_CHECK_EXECUTION], > > EXECUTION_EXAMPLES > > +AT_BANNER([OVSDB -- ovsdb-server relay]) > + > +# OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) > +# > +# Creates a database with the given SCHEMA and starts an ovsdb-server on > +# it. Also starts a daisy chain of ovsdb-servers in relay mode where the > +# first relay server is connected to the main non-relay ovsdb-server. > +# > +# Runs each of the TRANSACTIONS (which should be a quoted list of > +# quoted strings) against one of relay servers in the middle with > +# ovsdb-client one at a time. The server executes read-only transactions > +# and forwards rest of them to the previous ovsdb-server in a chain. > +# The main ovsdb-server executes 'write' transactions. Transaction > +# reply with data updates propagates back through the chain to all > +# the servers and the client. > +# > +# main relay relay relay relay relay > +# server1 <-- server2 <-- server3 <-- server4 <-- server5 <-- server6 > +# ^ > +# | > +# ovsdb-client > +# > +# Checks that the overall output is OUTPUT, but UUIDs in the output > +# are replaced by markers of the form <N> where N is a number. The > +# first unique UUID is replaced by <0>, the next by <1>, and so on. > +# If a given UUID appears more than once it is always replaced by the > +# same marker. > +# > +# Checks that the dump of all databases is the same. > +# > +# TITLE is provided to AT_SETUP and KEYWORDS to AT_KEYWORDS. > +m4_define([OVSDB_CHECK_EXECUTION], > + [AT_SETUP([$1]) > + AT_KEYWORDS([ovsdb server tcp relay $5]) > + n_servers=6 > + target=4 > + $2 > schema > + schema_name=`ovsdb-tool schema-name schema` > + AT_CHECK([ovsdb-tool create db1 schema], [0], [stdout], [ignore]) > + > + on_exit 'kill `cat *.pid`' > + AT_CHECK([ovsdb-server --detach --no-chdir --log-file=ovsdb-server1.log > dnl > + --pidfile --remote=punix:db1.sock db1 > + ], [0], [ignore], [ignore]) > + > + for i in $(seq 2 ${n_servers}); do > + AT_CHECK([ovsdb-server --detach --no-chdir dnl > + --log-file=ovsdb-server$i.log dnl > + --pidfile=${i}.pid --remote=punix:db${i}.sock dnl > + --unixctl=unixctl${i} -vjsonrpc:file:dbg dnl > + relay:${schema_name}:unix:db$((i-1)).sock > + ], [0], [ignore], [ignore]) > + done > + > + m4_foreach([txn], [$3], > + [AT_CHECK([ovsdb-client transact unix:db${target}.sock 'txn'], [0], > + [stdout], [ignore]) > + cat stdout >> output > + ]) > + > + AT_CHECK([uuidfilt output], [0], [$4], [ignore]) > + > + AT_CHECK([ovsdb-client dump unix:db1.sock], [0], [stdout], [ignore]) > + for i in $(seq 2 ${n_servers}); do > + OVS_WAIT_UNTIL([ovsdb-client dump unix:db${i}.sock > dump${i}; dnl > + diff stdout dump${i}]) > + done > + > + OVSDB_SERVER_SHUTDOWN > + for i in $(seq 2 ${n_servers}); do > + OVSDB_SERVER_SHUTDOWN_N([$i]) > + done > + AT_CLEANUP]) > + > +EXECUTION_EXAMPLES > + > AT_BANNER([OVSDB -- ovsdb-server replication]) > > # OVSDB_CHECK_EXECUTION(TITLE, SCHEMA, TRANSACTIONS, OUTPUT, [KEYWORDS]) >
Acked-by: Mark D. Gray <[email protected]> _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
