On Thu, Feb 25, 2021 at 7:33 PM Anton Ivanov <[email protected]> wrote: > > Found the most likely culprit. > > This is similar to this: https://bugzilla.redhat.com/show_bug.cgi?id=663584 > this: https://bugzilla.redhat.com/show_bug.cgi?id=1554955 > and god knows how many others. > > Selinux is "securing" your semaphores.
I disabled selinux (permissive) and I still see the same behavior > > A. > > On 25/02/2021 13:27, Anton Ivanov wrote: > > > > On 25/02/2021 12:41, Numan Siddique wrote: > >> On Fri, Feb 12, 2021 at 8:20 PM Anton Ivanov > >> <[email protected]> wrote: > >>> This adds a set of functions and macros intended to process > >>> hashes in parallel. > >>> > >>> The principles of operation are documented in the fasthmap.h > >>> > >>> If these one day go into the OVS tree, the OVS tree versions > >>> would be used in preference. > >>> > >>> Signed-off-by: Anton Ivanov <[email protected]> > >> Hi Anton, > >> > >> I still see problems with this patch set. > >> > >> If I apply the first 2 patches and run the tests, most of the test > >> cases fail or hang. > >> > >> When configured with gcc and address sanitizer, the test case fails - > >> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple > >> localnet ports with same tags (ovn.at:2970): FAILED > >> (ovs-macros.at:219) and I see the below in the testsuite.log > >> > >> ********* > >> clean up OVN > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded immediately > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded immediately > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded after 1 seconds > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovn-northd.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovn-northd exit > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded quickly > >> > >> main: clean up vswitch > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovs-vswitchd.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovs-vswitchd exit > >> --cleanup > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded quickly > >> ../../tests/ovn.at:3091: test -e $OVS_RUNDIR/ovsdb-server.pid > >> ../../tests/ovn.at:3091: ovs-appctl --timeout=10 -t ovsdb-server exit > >> ovn.at:3091: waiting while kill -0 $TMPPID 2>/dev/null... > >> ovn.at:3091: wait succeeded immediately > >> Address Sanitizer reported errors in: asan.2986645 > >> ================================================================= > >> ==2986645==ERROR: AddressSanitizer: SEGV on unknown address > >> 0x14f45be57000 (pc 0x14f45f424212 bp 0x000000000000 sp 0x14f45b1fda80 > >> T2) > >> ==2986645==The signal is caused by a READ memory access. > >> ../../tests/ovs-macros.at:219: hard failure > >> 45. ovn.at:2970: 45. ovn -- 2 HVs, 2 LS, switching between multiple > >> localnet ports with same tags (ovn.at:2970): FAILED > >> (ovs-macros.at:219) > >> > >> ****** > >> > >> The same is observed for many test cases. > >> > >> When I configure clang and run the tests, all the tests pass, but I > >> see lot of coredumps. > >> I think I had reported this earlier too. > > > > I have absolutely no idea how you get this. Seriously. > > > > I have been unable to reproduce this > > > > The only thing that comes to mind is to "fuzz" the semaphore creation and > > introduce random failures there to see if the error logic works. I can't > > see any logical problems in it via code inspection. > > > >> > >> Here is the backtrace > >> ----- > >> #0 0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from > >> /lib64/libpthread.so.0 > >> [Current thread is 1 (Thread 0x1528ac732640 (LWP 2907214))] > >> Missing separate debuginfos, use: dnf debuginfo-install > >> glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64 > >> libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64 > >> python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64 > >> zlib-1.2.11-23.fc33.x86_64 > >> (gdb) bt > >> #0 0x00001528ad37a212 in __new_sem_wait_slow.constprop.0 () from > >> /lib64/libpthread.so.0 > >> #1 0x0000000000421d84 in wait_for_work (control=0xf5c0e0) at > >> ../lib/ovn-parallel-hmap.h:193 > >> #2 build_lflows_thread (arg=0xf5c0e0) at ../northd/ovn-northd.c:11807 > >> #3 0x000000000049c8b2 in ovsthread_wrapper (aux_=<optimized out>) at > >> ../lib/ovs-thread.c:383 > >> #4 0x00001528ad3713f9 in start_thread () from /lib64/libpthread.so.0 > >> #5 0x00001528ad00e903 in clone () from /lib64/libc.so.6 > >> ---- > >> > >> > >> I'm sorry but something is not right. Instead of using semaphores, why > >> can't we use > >> 'struct latch' for each worker and use it to synchronize between the main > >> thread > >> and the workers ? > > > > We can't because this will conflict with the main poll loop. > > > > You need to rewrite the entire northd processing logic and the IO logic to > > use latch here. Or establish parallel logic - part of the joy of using > > "thead once" to do poll magic. > > > > None of these are a realistic option. I'd rather try to understand exactly > > what happens on your setup and what makes a sem_open() fail and why that is > > not handled properly by the error checking code. > > > > Can you send the results of "sysctl kernel.sem" please? That's the only way > > I know to limit semaphores and the usual limits on Linux are in the 1G > > range. Here's the output sysctl kernel.sem kernel.sem = 32000 1024000000 500 32000 I ran the same tests on the rhel 8 system, I don't see any crashes. Thanks Numan > > > >> > >> The usage of the function - can_parallelize_hashes() in ovn-northd.c is > >> very > >> confusing to me. > >> > >> I see that ovn_can_parallelize_hashes() calls setup_worker_pools() only > >> once > >> and for the subsequent calls, this function will be a no-op (due to > >> atomic_compare_exchange_strong()). > >> > >> Since this function is called before the workers are started, what is > >> the need to use > >> atomic_compare_exchange_strong() ? > > > > We have ended there over time. There was a point where there was no option > > and no initial invocation wasn't and we were invoking the set-up on first > > processing. > > > > In fact that is how it is invoked in the OVS codebase port for the > > parallelized monitors. I would actually like to keep this as an option. > > > >> > >> Let's say we want to add another config option - > >> force_northd_parallel, and if the user > >> toggles the value between the runs, then the below code in > >> ovnnb_db_run() in patch 3 > >> > >> --- > >> use_parallel_build = smap_get_bool(&nb->options, > >> "use_parallel_build", false) && > >> ovn_can_parallelize_hashes(false); > >> -- > >> will have no effect since setup_worker_pools() is not called later > >> once the atomic > >> bool initial_pool_setup is set to true. > >> > >> I think we should provide the option to toggle this value at run time. > >> Also the patch 3 should > >> add an option to configure the force_parallel. > >> > >>> --- > >>> lib/automake.mk | 2 + > >>> lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++ > >>> lib/ovn-parallel-hmap.h | 285 +++++++++++++++++++++++++ > >>> 3 files changed, 742 insertions(+) > >>> create mode 100644 lib/ovn-parallel-hmap.c > >>> create mode 100644 lib/ovn-parallel-hmap.h > >>> > >>> diff --git a/lib/automake.mk b/lib/automake.mk > >>> index 250c7aefa..781be2109 100644 > >>> --- a/lib/automake.mk > >>> +++ b/lib/automake.mk > >>> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ > >>> lib/expr.c \ > >>> lib/extend-table.h \ > >>> lib/extend-table.c \ > >>> + lib/ovn-parallel-hmap.h \ > >>> + lib/ovn-parallel-hmap.c \ > >>> lib/ip-mcast-index.c \ > >>> lib/ip-mcast-index.h \ > >>> lib/mcast-group-index.c \ > >>> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c > >>> new file mode 100644 > >>> index 000000000..06aa95aba > >>> --- /dev/null > >>> +++ b/lib/ovn-parallel-hmap.c > >>> @@ -0,0 +1,455 @@ > >>> +/* > >>> + * Copyright (c) 2020 Red Hat, Inc. > >>> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. > >>> + * > >>> + * Licensed under the Apache License, Version 2.0 (the "License"); > >>> + * you may not use this file except in compliance with the License. > >>> + * You may obtain a copy of the License at: > >>> + * > >>> + * http://www.apache.org/licenses/LICENSE-2.0 > >>> + * > >>> + * Unless required by applicable law or agreed to in writing, software > >>> + * distributed under the License is distributed on an "AS IS" BASIS, > >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > >>> implied. > >>> + * See the License for the specific language governing permissions and > >>> + * limitations under the License. > >>> + */ > >>> + > >>> +#include <config.h> > >>> +#include <stdint.h> > >>> +#include <string.h> > >>> +#include <stdlib.h> > >>> +#include <fcntl.h> > >>> +#include <unistd.h> > >>> +#include <errno.h> > >>> +#include <semaphore.h> > >>> +#include "fatal-signal.h" > >>> +#include "util.h" > >>> +#include "openvswitch/vlog.h" > >>> +#include "openvswitch/hmap.h" > >>> +#include "openvswitch/thread.h" > >>> +#include "ovn-parallel-hmap.h" > >>> +#include "ovs-atomic.h" > >>> +#include "ovs-thread.h" > >>> +#include "ovs-numa.h" > >>> +#include "random.h" > >>> + > >>> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); > >>> + > >>> +#ifndef OVS_HAS_PARALLEL_HMAP > >>> + > >>> +#define WORKER_SEM_NAME "%x-%p-%x" > >>> +#define MAIN_SEM_NAME "%x-%p-main" > >>> + > >>> +/* These are accessed under mutex inside add_worker_pool(). > >>> + * They do not need to be atomic. > >>> + */ > >>> + > >>> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); > >>> +static bool can_parallelize = false; > >>> + > >>> +/* This is set only in the process of exit and the set is > >>> + * accompanied by a fence. It does not need to be atomic or be > >>> + * accessed under a lock. > >>> + */ > >>> + > >>> +static bool workers_must_exit = false; > >>> + > >>> +static struct ovs_list worker_pools = > >>> OVS_LIST_INITIALIZER(&worker_pools); > >>> + > >>> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; > >>> + > >>> +static int pool_size; > >>> + > >>> +static int sembase; > >>> + > >>> +static void worker_pool_hook(void *aux OVS_UNUSED); > >>> +static void setup_worker_pools(bool force); > >>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, > >>> + void *fin_result, void *result_frags, > >>> + int index); > >>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > >>> + void *fin_result, void *result_frags, > >>> + int index); > >>> + > >>> +bool ovn_stop_parallel_processing(void) > >>> +{ > >>> + return workers_must_exit; > >>> +} > >>> + > >>> +bool ovn_can_parallelize_hashes(bool force_parallel) > >>> +{ > >>> + bool test = false; > >>> + > >>> + if (atomic_compare_exchange_strong( > >>> + &initial_pool_setup, > >>> + &test, > >>> + true)) { > >>> + ovs_mutex_lock(&init_mutex); > >>> + setup_worker_pools(force_parallel); > >>> + ovs_mutex_unlock(&init_mutex); > >>> + } > >>> + return can_parallelize; > >>> +} > >>> + > >>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ > >>> + > >>> + struct worker_pool *new_pool = NULL; > >>> + struct worker_control *new_control; > >>> + bool test = false; > >>> + int i; > >>> + char sem_name[256]; > >>> + > >>> + > >>> + /* Belt and braces - initialize the pool system just in case if > >>> + * if it is not yet initialized. > >>> + */ > >>> + > >>> + if (atomic_compare_exchange_strong( > >>> + &initial_pool_setup, > >>> + &test, > >>> + true)) { > >>> + ovs_mutex_lock(&init_mutex); > >>> + setup_worker_pools(false); > >>> + ovs_mutex_unlock(&init_mutex); > >>> + } > >>> + > >>> + ovs_mutex_lock(&init_mutex); > >>> + if (can_parallelize) { > >>> + new_pool = xmalloc(sizeof(struct worker_pool)); > >>> + new_pool->size = pool_size; > >>> + new_pool->controls = NULL; > >>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); > >>> + new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > >>> + if (new_pool->done == SEM_FAILED) { > >>> + goto cleanup; > >>> + } > >>> + > >>> + new_pool->controls = > >>> + xmalloc(sizeof(struct worker_control) * new_pool->size); > >>> + > >>> + for (i = 0; i < new_pool->size; i++) { > >>> + new_control = &new_pool->controls[i]; > >>> + new_control->id = i; > >>> + new_control->done = new_pool->done; > >>> + new_control->data = NULL; > >>> + ovs_mutex_init(&new_control->mutex); > >>> + new_control->finished = ATOMIC_VAR_INIT(false); > >>> + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); > >>> + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > >>> + if (new_control->fire == SEM_FAILED) { > >>> + goto cleanup; > >>> + } > >>> + } > >>> + > >>> + for (i = 0; i < pool_size; i++) { > >>> + ovs_thread_create("worker pool helper", start, > >>> &new_pool->controls[i]); > >>> + } > >>> + ovs_list_push_back(&worker_pools, &new_pool->list_node); > >>> + } > >>> + ovs_mutex_unlock(&init_mutex); > >>> + return new_pool; > >>> +cleanup: > >>> + > >>> + /* Something went wrong when opening semaphores. In this case > >>> + * it is better to shut off parallel procesing altogether > >>> + */ > >>> + > >>> + VLOG_INFO("Failed to initialize parallel processing, error %d", > >>> errno); > >>> + can_parallelize = false; > >>> + if (new_pool->controls) { > >>> + for (i = 0; i < new_pool->size; i++) { > >>> + if (new_pool->controls[i].fire != SEM_FAILED) { > >>> + sem_close(new_pool->controls[i].fire); > >>> + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); > >>> + sem_unlink(sem_name); > >>> + break; /* semaphores past this one are uninitialized */ > >>> + } > >>> + } > >>> + } > >>> + if (new_pool->done != SEM_FAILED) { > >>> + sem_close(new_pool->done); > >>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); > >>> + sem_unlink(sem_name); > >>> + } > >>> + ovs_mutex_unlock(&init_mutex); > >>> + return NULL; > >>> +} > >>> + > >>> + > >>> +/* Initializes 'hmap' as an empty hash table with mask N. */ > >>> +void > >>> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) > >>> +{ > >>> + size_t i; > >>> + > >>> + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); > >>> + hmap->one = NULL; > >>> + hmap->mask = mask; > >>> + hmap->n = 0; > >>> + for (i = 0; i <= hmap->mask; i++) { > >>> + hmap->buckets[i] = NULL; > >>> + } > >>> +} > >>> + > >>> +/* Initializes 'hmap' as an empty hash table of size X. > >>> + * Intended for use in parallel processing so that all > >>> + * fragments used to store results in a parallel job > >>> + * are the same size. > >>> + */ > >>> +void > >>> +ovn_fast_hmap_size_for(struct hmap *hmap, int size) > >>> +{ > >>> + size_t mask; > >>> + mask = size / 2; > >>> + mask |= mask >> 1; > >>> + mask |= mask >> 2; > >>> + mask |= mask >> 4; > >>> + mask |= mask >> 8; > >>> + mask |= mask >> 16; > >>> +#if SIZE_MAX > UINT32_MAX > >>> + mask |= mask >> 32; > >>> +#endif > >>> + > >>> + /* If we need to dynamically allocate buckets we might as well > >>> allocate at > >>> + * least 4 of them. */ > >>> + mask |= (mask & 1) << 1; > >>> + > >>> + fast_hmap_init(hmap, mask); > >>> +} > >>> + > >>> +/* Run a thread pool which uses a callback function to process results > >>> + */ > >>> + > >>> +void ovn_run_pool_callback(struct worker_pool *pool, > >>> + void *fin_result, void *result_frags, > >>> + void (*helper_func)(struct worker_pool *pool, > >>> + void *fin_result, > >>> + void *result_frags, int > >>> index)) > >>> +{ > >>> + int index, completed; > >>> + > >>> + /* Ensure that all worker threads see the same data as the > >>> + * main thread. > >>> + */ > >>> + > >>> + atomic_thread_fence(memory_order_acq_rel); > >>> + > >>> + /* Start workers */ > >>> + > >>> + for (index = 0; index < pool->size; index++) { > >>> + sem_post(pool->controls[index].fire); > >>> + } > >>> + > >>> + completed = 0; > >>> + > >>> + do { > >>> + bool test; > >>> + /* Note - we do not loop on semaphore until it reaches > >>> + * zero, but on pool size/remaining workers. > >>> + * This is by design. If the inner loop can handle > >>> + * completion for more than one worker within an iteration > >>> + * it will do so to ensure no additional iterations and > >>> + * waits once all of them are done. > >>> + * > >>> + * This may result in us having an initial positive value > >>> + * of the semaphore when the pool is invoked the next time. > >>> + * This is harmless - the loop will spin up a couple of times > >>> + * doing nothing while the workers are processing their data > >>> + * slices. > >>> + */ > >>> + sem_wait(pool->done); > >>> + for (index = 0; index < pool->size; index++) { > >>> + test = true; > >>> + /* If the worker has marked its data chunk as complete, > >>> + * invoke the helper function to combine the results of > >>> + * this worker into the main result. > >>> + * > >>> + * The worker must invoke an appropriate memory fence > >>> + * (most likely acq_rel) to ensure that the main thread > >>> + * sees all of the results produced by the worker. > >>> + */ > >>> + if (atomic_compare_exchange_weak( > >>> + &pool->controls[index].finished, > >>> + &test, > >>> + false)) { > >>> + if (helper_func) { > >>> + (helper_func)(pool, fin_result, result_frags, index); > >>> + } > >>> + completed++; > >>> + pool->controls[index].data = NULL; > >>> + } > >>> + } > >>> + } while (completed < pool->size); > >>> +} > >>> + > >>> +/* Run a thread pool - basic, does not do results processing. > >>> + */ > >>> + > >>> +void ovn_run_pool(struct worker_pool *pool) > >>> +{ > >>> + run_pool_callback(pool, NULL, NULL, NULL); > >>> +} > >>> + > >>> +/* Brute force merge of a hashmap into another hashmap. > >>> + * Intended for use in parallel processing. The destination > >>> + * hashmap MUST be the same size as the one being merged. > >>> + * > >>> + * This can be achieved by pre-allocating them to correct size > >>> + * and using hmap_insert_fast() instead of hmap_insert() > >>> + */ > >>> + > >>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) > >>> +{ > >>> + size_t i; > >>> + > >>> + ovs_assert(inc->mask == dest->mask); > >>> + > >>> + if (!inc->n) { > >>> + /* Request to merge an empty frag, nothing to do */ > >>> + return; > >>> + } > >>> + > >>> + for (i = 0; i <= dest->mask; i++) { > >>> + struct hmap_node **dest_bucket = &dest->buckets[i]; > >>> + struct hmap_node **inc_bucket = &inc->buckets[i]; > >>> + if (*inc_bucket != NULL) { > >>> + struct hmap_node *last_node = *inc_bucket; > >>> + while (last_node->next != NULL) { > >>> + last_node = last_node->next; > >>> + } > >>> + last_node->next = *dest_bucket; > >>> + *dest_bucket = *inc_bucket; > >>> + *inc_bucket = NULL; > >>> + } > >>> + } > >>> + dest->n += inc->n; > >>> + inc->n = 0; > >>> +} > >>> + > >>> +/* Run a thread pool which gathers results in an array > >>> + * of hashes. Merge results. > >>> + */ > >>> + > >>> + > >>> +void ovn_run_pool_hash( > >>> + struct worker_pool *pool, > >>> + struct hmap *result, > >>> + struct hmap *result_frags) > >>> +{ > >>> + run_pool_callback(pool, result, result_frags, merge_hash_results); > >>> +} > >>> + > >>> +/* Run a thread pool which gathers results in an array of lists. > >>> + * Merge results. > >>> + */ > >>> +void ovn_run_pool_list( > >>> + struct worker_pool *pool, > >>> + struct ovs_list *result, > >>> + struct ovs_list *result_frags) > >>> +{ > >>> + run_pool_callback(pool, result, result_frags, merge_list_results); > >>> +} > >>> + > >>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks > >>> *hrl) > >>> +{ > >>> + int i; > >>> + if (hrl->mask != lflows->mask) { > >>> + if (hrl->row_locks) { > >>> + free(hrl->row_locks); > >>> + } > >>> + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask > >>> + 1); > >>> + hrl->mask = lflows->mask; > >>> + for (i = 0; i <= lflows->mask; i++) { > >>> + ovs_mutex_init(&hrl->row_locks[i]); > >>> + } > >>> + } > >>> +} > >>> + > >>> +static void worker_pool_hook(void *aux OVS_UNUSED) { > >>> + int i; > >>> + static struct worker_pool *pool; > >>> + char sem_name[256]; > >>> + > >>> + workers_must_exit = true; > >>> + > >>> + /* All workers must honour the must_exit flag and check for it > >>> regularly. > >>> + * We can make it atomic and check it via atomics in workers, but > >>> that > >>> + * is not really necessary as it is set just once - when the program > >>> + * terminates. So we use a fence which is invoked before exiting > >>> instead. > >>> + */ > >>> + atomic_thread_fence(memory_order_acq_rel); > >>> + > >>> + /* Wake up the workers after the must_exit flag has been set */ > >>> + > >>> + LIST_FOR_EACH (pool, list_node, &worker_pools) { > >>> + for (i = 0; i < pool->size ; i++) { > >>> + sem_post(pool->controls[i].fire); > >>> + } > >>> + for (i = 0; i < pool->size ; i++) { > >>> + sem_close(pool->controls[i].fire); > >>> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); > >>> + sem_unlink(sem_name); > >>> + } > >>> + sem_close(pool->done); > >>> + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); > >>> + sem_unlink(sem_name); > >>> + } > >>> +} > >>> + > >>> +static void setup_worker_pools(bool force) { > >>> + int cores, nodes; > >>> + > >>> + nodes = ovs_numa_get_n_numas(); > >>> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { > >>> + nodes = 1; > >>> + } > >>> + cores = ovs_numa_get_n_cores(); > >>> + > >>> + /* If there is no NUMA config, use 4 cores. > >>> + * If there is NUMA config use half the cores on > >>> + * one node so that the OS does not start pushing > >>> + * threads to other nodes. > >>> + */ > >>> + if (cores == OVS_CORE_UNSPEC || cores <= 0) { > >>> + /* If there is no NUMA we can try the ovs-threads routine. > >>> + * It falls back to sysconf and/or affinity mask. > >>> + */ > >>> + cores = count_cpu_cores(); > >>> + pool_size = cores; > >>> + } else { > >>> + pool_size = cores / nodes; > >>> + } > >>> + if ((pool_size < 4) && force) { > >>> + pool_size = 4; > >>> + } > >>> + can_parallelize = (pool_size >= 3); > >>> + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); > >>> + sembase = random_uint32(); > >>> +} > >>> + > >>> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, > >>> + void *fin_result, void *result_frags, > >>> + int index) > >>> +{ > >>> + struct ovs_list *result = (struct ovs_list *)fin_result; > >>> + struct ovs_list *res_frags = (struct ovs_list *)result_frags; > >>> + > >>> + if (!ovs_list_is_empty(&res_frags[index])) { > >>> + ovs_list_splice(result->next, > >>> + ovs_list_front(&res_frags[index]), &res_frags[index]); > >>> + } > >>> +} > >>> + > >>> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > >>> + void *fin_result, void *result_frags, > >>> + int index) > >>> +{ > >>> + struct hmap *result = (struct hmap *)fin_result; > >>> + struct hmap *res_frags = (struct hmap *)result_frags; > >>> + > >>> + fast_hmap_merge(result, &res_frags[index]); > >>> + hmap_destroy(&res_frags[index]); > >>> +} > >>> + > >>> +#endif > >>> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h > >>> new file mode 100644 > >>> index 000000000..71ad17fb0 > >>> --- /dev/null > >>> +++ b/lib/ovn-parallel-hmap.h > >>> @@ -0,0 +1,285 @@ > >>> +/* > >>> + * Copyright (c) 2020 Red Hat, Inc. > >>> + * > >>> + * Licensed under the Apache License, Version 2.0 (the "License"); > >>> + * you may not use this file except in compliance with the License. > >>> + * You may obtain a copy of the License at: > >>> + * > >>> + * http://www.apache.org/licenses/LICENSE-2.0 > >>> + * > >>> + * Unless required by applicable law or agreed to in writing, software > >>> + * distributed under the License is distributed on an "AS IS" BASIS, > >>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > >>> implied. > >>> + * See the License for the specific language governing permissions and > >>> + * limitations under the License. > >>> + */ > >>> + > >>> +#ifndef OVN_PARALLEL_HMAP > >>> +#define OVN_PARALLEL_HMAP 1 > >>> + > >>> +/* if the parallel macros are defined by hmap.h or any other ovs define > >>> + * we skip over the ovn specific definitions. > >>> + */ > >>> + > >>> +#ifdef __cplusplus > >>> +extern "C" { > >>> +#endif > >>> + > >>> +#include <stdbool.h> > >>> +#include <stdlib.h> > >>> +#include <semaphore.h> > >>> +#include "openvswitch/util.h" > >>> +#include "openvswitch/hmap.h" > >>> +#include "openvswitch/thread.h" > >>> +#include "ovs-atomic.h" > >>> + > >>> +/* Process this include only if OVS does not supply parallel definitions > >>> + */ > >>> + > >>> +#ifdef OVS_HAS_PARALLEL_HMAP > >>> + > >>> +#include "parallel-hmap.h" > >>> + > >>> +#else > >>> + > >>> + > >>> +#ifdef __clang__ > >>> +#pragma clang diagnostic push > >>> +#pragma clang diagnostic ignored "-Wthread-safety" > >>> +#endif > >> I think you missed addressing my comment I provided in v13 to > >> add some comments on why this is required. > >> > >> > >>> + > >>> + > >>> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part > >>> + * of parallel processing. > >>> + * Each worker thread has a different ThreadID in the range of > >>> 0..POOL_SIZE > >>> + * and will iterate hash buckets ThreadID, ThreadID + step, > >>> + * ThreadID + step * 2, etc. The actual macro accepts > >>> + * ThreadID + step * i as the JOBID parameter. > >>> + */ > >>> + > >>> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ > >>> + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), > >>> MEMBER); \ > >>> + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ > >>> + || ((NODE = NULL), false); \ > >>> + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), > >>> MEMBER)) > >>> + > >>> +/* We do not have a SAFE version of the macro, because the hash size is > >>> not > >>> + * atomic and hash removal operations would need to be wrapped with > >>> + * locks. This will defeat most of the benefits from doing anything in > >>> + * parallel. > >>> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove > >>> elements, > >>> + * each thread should store them in a temporary list result instead, > >>> merging > >>> + * the lists into a combined result at the end */ > >>> + > >>> +/* Work "Handle" */ > >>> + > >>> +struct worker_control { > >>> + int id; /* Used as a modulo when iterating over a hash. */ > >>> + atomic_bool finished; /* Set to true after achunk of work is > >>> complete. */ > >>> + sem_t *fire; /* Work start semaphore - sem_post starts the worker. */ > >>> + sem_t *done; /* Work completion semaphore - sem_post on completion. > >>> */ > >>> + struct ovs_mutex mutex; /* Guards the data. */ > >>> + void *data; /* Pointer to data to be processed. */ > >>> + void *workload; /* back-pointer to the worker pool structure. */ > >>> +}; > >>> + > >>> +struct worker_pool { > >>> + int size; /* Number of threads in the pool. */ > >>> + struct ovs_list list_node; /* List of pools - used in cleanup/exit. > >>> */ > >>> + struct worker_control *controls; /* "Handles" in this pool. */ > >>> + sem_t *done; /* Work completion semaphorew. */ > >>> +}; > >>> + > >>> +/* Add a worker pool for thread function start() which expects a pointer > >>> to > >>> + * a worker_control structure as an argument. */ > >>> + > >>> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); > >>> + > >>> +/* Setting this to true will make all processing threads exit */ > >>> + > >>> +bool ovn_stop_parallel_processing(void); > >>> + > >>> +/* Build a hmap pre-sized for size elements */ > >>> + > >>> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); > >>> + > >>> +/* Build a hmap with a mask equals to size */ > >>> + > >>> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); > >>> + > >>> +/* Brute-force merge a hmap into hmap. > >>> + * Dest and inc have to have the same mask. The merge is performed > >>> + * by extending the element list for bucket N in the dest hmap with the > >>> list > >>> + * from bucket N in inc. > >>> + */ > >>> + > >>> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); > >>> + > >>> +/* Run a pool, without any default processing of results. > >>> + */ > >>> + > >>> +void ovn_run_pool(struct worker_pool *pool); > >>> + > >>> +/* Run a pool, merge results from hash frags into a final hash result. > >>> + * The hash frags must be pre-sized to the same size. > >>> + */ > >>> + > >>> +void ovn_run_pool_hash(struct worker_pool *pool, > >>> + struct hmap *result, struct hmap *result_frags); > >>> +/* Run a pool, merge results from list frags into a final list result. > >>> + */ > >>> + > >>> +void ovn_run_pool_list(struct worker_pool *pool, > >>> + struct ovs_list *result, struct ovs_list > >>> *result_frags); > >>> + > >>> +/* Run a pool, call a callback function to perform processing of results. > >>> + */ > >>> + > >>> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, > >>> + void *result_frags, > >>> + void (*helper_func)(struct worker_pool *pool, > >>> + void *fin_result, void *result_frags, int > >>> index)); > >>> + > >>> + > >>> +/* Returns the first node in 'hmap' in the bucket in which the given > >>> 'hash' > >>> + * would land, or a null pointer if that bucket is empty. */ > >>> + > >>> +static inline struct hmap_node * > >>> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) > >>> +{ > >>> + return hmap->buckets[num]; > >>> +} > >>> + > >>> +static inline struct hmap_node * > >>> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t > >>> pool_size) > >>> +{ > >>> + size_t i; > >>> + for (i = start; i <= hmap->mask; i+= pool_size) { > >>> + struct hmap_node *node = hmap->buckets[i]; > >>> + if (node) { > >>> + return node; > >>> + } > >>> + } > >>> + return NULL; > >>> +} > >>> + > >>> +/* Returns the first node in 'hmap', as expected by thread with job_id > >>> + * for parallel processing in arbitrary order, or a null pointer if > >>> + * the slice of 'hmap' for that job_id is empty. */ > >>> +static inline struct hmap_node * > >>> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t > >>> pool_size) > >>> +{ > >>> + return parallel_hmap_next__(hmap, job_id, pool_size); > >>> +} > >>> + > >>> +/* Returns the next node in the slice of 'hmap' following 'node', > >>> + * in arbitrary order, or a * null pointer if 'node' is the last node in > >>> + * the 'hmap' slice. > >>> + * > >>> + */ > >>> +static inline struct hmap_node * > >>> +parallel_hmap_next(const struct hmap *hmap, > >>> + const struct hmap_node *node, ssize_t pool_size) > >>> +{ > >>> + return (node->next > >>> + ? node->next > >>> + : parallel_hmap_next__(hmap, > >>> + (node->hash & hmap->mask) + pool_size, pool_size)); > >>> +} > >>> + > >>> +static inline void post_completed_work(struct worker_control *control) > >>> +{ > >>> + atomic_thread_fence(memory_order_acq_rel); > >>> + atomic_store_relaxed(&control->finished, true); > >>> + sem_post(control->done); > >>> +} > >>> + > >>> +static inline void wait_for_work(struct worker_control *control) > >>> +{ > >>> + sem_wait(control->fire); > >>> +} > >>> + > >>> +/* Hash per-row locking support - to be used only in conjunction > >>> + * with fast hash inserts. Normal hash inserts may resize the hash > >>> + * rendering the locking invalid. > >>> + */ > >>> + > >>> +struct hashrow_locks { > >>> + ssize_t mask; > >>> + struct ovs_mutex *row_locks; > >>> +}; > >>> + > >>> +/* Update an hash row locks structure to match the current hash size */ > >>> + > >>> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks > >>> *hrl); > >>> + > >>> +/* Lock a hash row */ > >>> + > >>> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t > >>> hash) > >>> +{ > >>> + ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]); > >>> +} > >>> + > >>> +/* Unlock a hash row */ > >>> + > >>> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t > >>> hash) > >>> +{ > >>> + ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]); > >>> +} > >>> +/* Init the row locks structure */ > >>> + > >>> +static inline void init_hash_row_locks(struct hashrow_locks *hrl) > >>> +{ > >>> + hrl->mask = 0; > >>> + hrl->row_locks = NULL; > >>> +} > >>> + > >>> +bool ovn_can_parallelize_hashes(bool force_parallel); > >>> + > >>> +/* Use the OVN library functions for stuff which OVS has not defined > >>> + * If OVS has defined these, they will still compile using the OVN > >>> + * local names, but will be dropped by the linker in favour of the OVS > >>> + * supplied functions. > >>> + */ > >>> + > >>> +#define update_hashrow_locks(lflows, hrl) > >>> ovn_update_hashrow_locks(lflows, hrl) > >>> + > >>> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) > >>> + > >>> +#define stop_parallel_processing() ovn_stop_parallel_processing() > >>> + > >>> +#define add_worker_pool(start) ovn_add_worker_pool(start) > >>> + > >>> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) > >>> + > >>> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) > >>> + > >>> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) > >>> + > >>> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) > >>> + > >>> +#define ovn_run_pool(pool) ovn_run_pool(pool) > >>> + > >>> +#define run_pool_hash(pool, result, result_frags) \ > >>> + ovn_run_pool_hash(pool, result, result_frags) > >>> + > >>> +#define run_pool_list(pool, result, result_frags) \ > >>> + ovn_run_pool_list(pool, result, result_frags) > >>> + > >>> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ > >>> + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) > >>> + > >>> + > >> In my opinion, we can switch over to making use of OVS APIs for fast_hmap > >> once the patches in OVS are merged. Until then I think we should just > >> assume > >> that these functions are part of OVN lib and consume them directly. > >> > >> It's possible that the function names could change when those patches > >> land in OVS. > >> > >> Thanks > >> Numan > >> > >>> + > >>> +#ifdef __clang__ > >>> +#pragma clang diagnostic pop > >>> +#endif > >>> + > >>> +#endif > >>> + > >>> +#ifdef __cplusplus > >>> +} > >>> +#endif > >>> + > >>> + > >>> +#endif /* lib/fasthmap.h */ > >>> -- > >>> 2.20.1 > >>> > >>> _______________________________________________ > >>> dev mailing list > >>> [email protected] > >>> https://mail.openvswitch.org/mailman/listinfo/ovs-dev > >>> > > -- > Anton R. Ivanov > Cambridgegreys Limited. Registered in England. Company Number 10273661 > https://www.cambridgegreys.com/ > > _______________________________________________ > dev mailing list > [email protected] > https://mail.openvswitch.org/mailman/listinfo/ovs-dev _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
