From: Anton Ivanov <anton.iva...@cambridgegreys.com> Signed-off-by: Anton Ivanov <anton.iva...@cambridgegreys.com> --- northd/ovn-northd.c | 196 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 178 insertions(+), 18 deletions(-)
diff --git a/northd/ovn-northd.c b/northd/ovn-northd.c index a53b31906..ef188fb88 100644 --- a/northd/ovn-northd.c +++ b/northd/ovn-northd.c @@ -4146,7 +4146,9 @@ static void ovn_lflow_destroy(struct hmap *lflows, struct ovn_lflow *lflow) { if (lflow) { - hmap_remove(lflows, &lflow->hmap_node); + if (lflows) { + hmap_remove(lflows, &lflow->hmap_node); + } free(lflow->match); free(lflow->actions); free(lflow->stage_hint); @@ -11071,6 +11073,120 @@ build_lrouter_flows(struct hmap *datapaths, struct hmap *ports, } } + +struct sbrec_result { + struct ovs_list list_node; + const struct sbrec_logical_flow *sbflow; + struct ovn_lflow *lflow; + ssize_t lflow_hash; +}; + +struct reconcile_info { + struct northd_context *ctx; + struct hmap *lflows; + struct hmap *datapaths; + struct ovs_list results; +}; + +struct lflow_reconciliation_pool { + struct worker_pool *pool; +}; + +static void *reconciliation_thread(void *arg) { + struct worker_control *control = (struct worker_control *) arg; + struct lflow_reconciliation_pool *workload; + struct reconcile_info *ri; + struct sbrec_result *res; + + while (!seize_fire()) { + sem_wait(&control->fire); + workload = (struct lflow_reconciliation_pool *) control->workload; + ri = (struct reconcile_info *) control->data; + if (ri && workload) { + /* Push changes to the Logical_Flow table to database. */ + const struct sbrec_logical_flow *sbflow; + SBREC_LOGICAL_FLOW_PARALLEL_FOR_EACH(sbflow, ri->ctx->ovnsb_idl, control->id, workload->pool->size) { + struct ovn_datapath *od + = ovn_datapath_from_sbrec(ri->datapaths, sbflow->logical_datapath); + res = xmalloc(sizeof(struct sbrec_result)); + + if (!od || ovn_datapath_is_stale(od)) { + res->sbflow = sbflow; + res->lflow = NULL; + ovs_list_push_back(&ri->results, &res->list_node); + continue; + } + + enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER; + enum ovn_pipeline pipeline + = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT; + struct ovn_lflow *lflow = ovn_lflow_find( + ri->lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id), + sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash); + if (lflow) { + res->lflow = lflow; + res->sbflow = sbflow; + res->lflow_hash = lflow->hmap_node.hash; + } else { + res->sbflow = sbflow; + res->lflow = NULL; + } + ovs_list_push_back(&ri->results, &res->list_node); + } + atomic_store_relaxed(&control->finished, true); + atomic_thread_fence(memory_order_release); + } + sem_post(control->done); + } + return NULL; +} + +static struct lflow_reconciliation_pool *reconcile_pool = NULL; + +static void init_reconciliation_pool(void) { + + int index; + + if (!reconcile_pool) { + reconcile_pool = + xmalloc(sizeof(struct lflow_reconciliation_pool)); + reconcile_pool->pool = + add_worker_pool(reconciliation_thread); + + for (index = 0; index < reconcile_pool->pool->size; index++) { + reconcile_pool->pool->controls[index].workload = + reconcile_pool; + } + } +} + +/* Removes 'node' from 'hmap' if present. Does not shrink the hash table; call + * hmap_shrink() directly if desired. + * Returns true if the node was found and removed, false otherwise. + * It needs both a node and a hash in order to function even if the node + * has already been freed. + */ +static bool +hmap_safe_remove(struct hmap *hmap, struct hmap_node *node, size_t hash) +{ + struct hmap_node **bucket = &hmap->buckets[hash & hmap->mask]; + + if (!node) { + return false; + } + + while ((*bucket) && (*bucket != node)) { + bucket = &(*bucket)->next; + } + if (*bucket) { + *bucket = node->next; + hmap->n--; + return true; + } + return false; +} + +#define RECONCILE_CUTOFF 1 static ssize_t max_seen_lflow_size = 128; @@ -11084,6 +11200,7 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, struct hmap *lbs) { struct hmap lflows; + const struct sbrec_logical_flow *sbflow; fast_hmap_size_for(&lflows, max_seen_lflow_size); @@ -11096,27 +11213,70 @@ build_lflows(struct northd_context *ctx, struct hmap *datapaths, } /* Push changes to the Logical_Flow table to database. */ - const struct sbrec_logical_flow *sbflow, *next_sbflow; - SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) { - struct ovn_datapath *od - = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath); - if (!od || ovn_datapath_is_stale(od)) { - sbrec_logical_flow_delete(sbflow); - continue; + if (hmap_count(&lflows) < RECONCILE_CUTOFF) { + /* Push changes to the Logical_Flow table to database. */ + const struct sbrec_logical_flow *next_sbflow; + SBREC_LOGICAL_FLOW_FOR_EACH_SAFE (sbflow, next_sbflow, ctx->ovnsb_idl) { + struct ovn_datapath *od + = ovn_datapath_from_sbrec(datapaths, sbflow->logical_datapath); + if (!od || ovn_datapath_is_stale(od)) { + sbrec_logical_flow_delete(sbflow); + continue; + } + + enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER; + enum ovn_pipeline pipeline + = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT; + struct ovn_lflow *lflow = ovn_lflow_find( + &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id), + sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash); + if (lflow) { + ovn_lflow_destroy(&lflows, lflow); + } else { + sbrec_logical_flow_delete(sbflow); + } } + } else { + struct reconcile_info *ri; + struct ovs_list *combined_result = NULL; + struct ovs_list **results = NULL; + int index; + init_reconciliation_pool(); - enum ovn_datapath_type dp_type = od->nbs ? DP_SWITCH : DP_ROUTER; - enum ovn_pipeline pipeline - = !strcmp(sbflow->pipeline, "ingress") ? P_IN : P_OUT; - struct ovn_lflow *lflow = ovn_lflow_find( - &lflows, od, ovn_stage_build(dp_type, pipeline, sbflow->table_id), - sbflow->priority, sbflow->match, sbflow->actions, sbflow->hash); - if (lflow) { - ovn_lflow_destroy(&lflows, lflow); - } else { - sbrec_logical_flow_delete(sbflow); + ri = xmalloc(sizeof(struct reconcile_info) * + reconcile_pool->pool->size); + results = xmalloc(sizeof(struct ovs_list *) * + reconcile_pool->pool->size); + + for (index = 0; + index < reconcile_pool->pool->size; index++) { + + ri[index].lflows = &lflows; + ri[index].datapaths = datapaths; + ri[index].ctx = ctx; + ovs_list_init(&ri[index].results); + results[index] = &ri[index].results; + reconcile_pool->pool->controls[index].data = &ri[index]; } + + run_pool_list( + reconcile_pool->pool, + &combined_result, + results); + + struct sbrec_result *res; + LIST_FOR_EACH_POP (res, list_node, combined_result) { + if (hmap_safe_remove(&lflows, &res->lflow->hmap_node, res->lflow_hash)) { + ovn_lflow_destroy(NULL, res->lflow); + } else { + sbrec_logical_flow_delete(res->sbflow); + } + free(res); + } + free(results); + free(ri); + } struct ovn_lflow *lflow, *next_lflow; HMAP_FOR_EACH_SAFE (lflow, next_lflow, hmap_node, &lflows) { -- 2.20.1 _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev