On Thu, Mar 25, 2021 at 3:01 PM Anton Ivanov <[email protected]> wrote: > > > > On 24/03/2021 15:31, Numan Siddique wrote: > > On Mon, Mar 1, 2021 at 6:35 PM <[email protected]> wrote: > >> > >> From: Anton Ivanov <[email protected]> > >> > >> This adds a set of functions and macros intended to process > >> hashes in parallel. > >> > >> The principles of operation are documented in the ovn-parallel-hmap.h > >> > >> If these one day go into the OVS tree, the OVS tree versions > >> would be used in preference. > >> > >> Signed-off-by: Anton Ivanov <[email protected]> > > > > Hi Anton, > > > > I tested the first 2 patches of this series and it crashes again for me. > > > > This time I ran tests on a 4 core machine - Intel(R) Xeon(R) CPU > > E3-1220 v5 @ 3.00GHz > > > > The below trace is seen for both gcc and clang. > > > > ---- > > [Thread debugging using libthread_db enabled] > > Using host libthread_db library "/lib64/libthread_db.so.1". > > Core was generated by `ovn-northd -vjsonrpc > > --ovnnb-db=unix:/mnt/mydisk/myhome/numan_alt/work/ovs_ovn/'. > > Program terminated with signal SIGSEGV, Segmentation fault. > > #0 0x00007f27594ae212 in __new_sem_wait_slow.constprop.0 () from > > /lib64/libpthread.so.0 > > [Current thread is 1 (Thread 0x7f2758c68640 (LWP 347378))] > > Missing separate debuginfos, use: dnf debuginfo-install > > glibc-2.32-3.fc33.x86_64 libcap-ng-0.8-1.fc33.x86_64 > > libevent-2.1.8-10.fc33.x86_64 openssl-libs-1.1.1i-1.fc33.x86_64 > > python3-libs-3.9.1-2.fc33.x86_64 unbound-libs-1.10.1-4.fc33.x86_64 > > zlib-1.2.11-23.fc33.x86_64 > > (gdb) bt > > #0 0x00007f27594ae212 in __new_sem_wait_slow.constprop.0 () from > > /lib64/libpthread.so.0 > > #1 0x0000000000422184 in wait_for_work (control=<optimized out>) at > > ../lib/ovn-parallel-hmap.h:203 > > #2 build_lflows_thread (arg=0x2538420) at ../northd/ovn-northd.c:11855 > > #3 0x000000000049cd12 in ovsthread_wrapper (aux_=<optimized out>) at > > ../lib/ovs-thread.c:383 > > #4 0x00007f27594a53f9 in start_thread () from /lib64/libpthread.so.0 > > #5 0x00007f2759142903 in clone () from /lib64/libc.so.6 > > ----- > > > > I'm not sure why you're not able to reproduce this issue. > > I can't. I have run it for days in a loop. > > One possibility is that for whatever reason your machine has slower IPC > speeds compared to linear execution speeds. Thread debugging? AMD vs Intel? > No idea. > > There is a race on-exit in the current code which I have found by inspection > and which I have never been able to trigger. On my machines the workers > always exit in time before the main thread has finished, so I cannot trigger > this. > > Can you try this incremental fix to see if it fixes the problem for you. If > that works, I will incorporate it and reissue the patch. If not - I will > continue digging. > > diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c > index e83ae23cb..3597f896f 100644 > --- a/lib/ovn-parallel-hmap.c > +++ b/lib/ovn-parallel-hmap.c > @@ -143,7 +143,8 @@ struct worker_pool *ovn_add_worker_pool(void > *(*start)(void *)){ > } > > for (i = 0; i < pool_size; i++) { > - ovs_thread_create("worker pool helper", start, > &new_pool->controls[i]); > + new_pool->controls[i].worker = > + ovs_thread_create("worker pool helper", start, > &new_pool->controls[i]); > } > ovs_list_push_back(&worker_pools, &new_pool->list_node); > } > @@ -386,6 +387,9 @@ static void worker_pool_hook(void *aux OVS_UNUSED) { > for (i = 0; i < pool->size ; i++) { > sem_post(pool->controls[i].fire); > } > + for (i = 0; i < pool->size ; i++) { > + pthread_join(pool->controls[i].worker, NULL); > + } > for (i = 0; i < pool->size ; i++) { > sem_close(pool->controls[i].fire); > sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); > diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h > index 8db61eaba..d62ca3da5 100644 > --- a/lib/ovn-parallel-hmap.h > +++ b/lib/ovn-parallel-hmap.h > @@ -82,6 +82,7 @@ struct worker_control { > struct ovs_mutex mutex; /* Guards the data. */ > void *data; /* Pointer to data to be processed. */ > void *workload; /* back-pointer to the worker pool structure. */ > + pthread_t worker; > }; > > struct worker_pool { >
I applied the above diff on top of patch 2 and did some tests. I see a big improvement with this. On my "Intel(R) Xeon(R) CPU E3-1220 v5 @ 3.00GHz" server, I saw just one crash only once when I ran the test suite multiple times. On my work laptop (in which the tests used to hang earlier), all the tests are passing now. But I see a lot more consistent crashes here. For all single run of whole testsuite (with make check -j5) I observed around 7 crashes. Definitely an improvement when compared to my previous runs with v14. Here are the back traces details of the core dumps I observed - https://gist.github.com/numansiddique/5cab90ec4a1ee6e1adbfd3cd90eccf5a Crash 1 and Crash 2 are frequent. Let me know in case you want the core files. Thanks Numan > > > > > All the test cases passed for me. So maybe something's wrong when > > ovn-northd exits. > > IMHO, these crashes should be addressed before these patches can be > > considered. > > > > Thanks > > Numan > > > >> --- > >> lib/automake.mk | 2 + > >> lib/ovn-parallel-hmap.c | 455 ++++++++++++++++++++++++++++++++++++++++ > >> lib/ovn-parallel-hmap.h | 301 ++++++++++++++++++++++++++ > >> 3 files changed, 758 insertions(+) > >> create mode 100644 lib/ovn-parallel-hmap.c > >> create mode 100644 lib/ovn-parallel-hmap.h > >> > >> diff --git a/lib/automake.mk b/lib/automake.mk > >> index 250c7aefa..781be2109 100644 > >> --- a/lib/automake.mk > >> +++ b/lib/automake.mk > >> @@ -13,6 +13,8 @@ lib_libovn_la_SOURCES = \ > >> lib/expr.c \ > >> lib/extend-table.h \ > >> lib/extend-table.c \ > >> + lib/ovn-parallel-hmap.h \ > >> + lib/ovn-parallel-hmap.c \ > >> lib/ip-mcast-index.c \ > >> lib/ip-mcast-index.h \ > >> lib/mcast-group-index.c \ > >> diff --git a/lib/ovn-parallel-hmap.c b/lib/ovn-parallel-hmap.c > >> new file mode 100644 > >> index 000000000..e83ae23cb > >> --- /dev/null > >> +++ b/lib/ovn-parallel-hmap.c > >> @@ -0,0 +1,455 @@ > >> +/* > >> + * Copyright (c) 2020 Red Hat, Inc. > >> + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. > >> + * > >> + * Licensed under the Apache License, Version 2.0 (the "License"); > >> + * you may not use this file except in compliance with the License. > >> + * You may obtain a copy of the License at: > >> + * > >> + * http://www.apache.org/licenses/LICENSE-2.0 > >> + * > >> + * Unless required by applicable law or agreed to in writing, software > >> + * distributed under the License is distributed on an "AS IS" BASIS, > >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > >> implied. > >> + * See the License for the specific language governing permissions and > >> + * limitations under the License. > >> + */ > >> + > >> +#include <config.h> > >> +#include <stdint.h> > >> +#include <string.h> > >> +#include <stdlib.h> > >> +#include <fcntl.h> > >> +#include <unistd.h> > >> +#include <errno.h> > >> +#include <semaphore.h> > >> +#include "fatal-signal.h" > >> +#include "util.h" > >> +#include "openvswitch/vlog.h" > >> +#include "openvswitch/hmap.h" > >> +#include "openvswitch/thread.h" > >> +#include "ovn-parallel-hmap.h" > >> +#include "ovs-atomic.h" > >> +#include "ovs-thread.h" > >> +#include "ovs-numa.h" > >> +#include "random.h" > >> + > >> +VLOG_DEFINE_THIS_MODULE(ovn_parallel_hmap); > >> + > >> +#ifndef OVS_HAS_PARALLEL_HMAP > >> + > >> +#define WORKER_SEM_NAME "%x-%p-%x" > >> +#define MAIN_SEM_NAME "%x-%p-main" > >> + > >> +/* These are accessed under mutex inside add_worker_pool(). > >> + * They do not need to be atomic. > >> + */ > >> + > >> +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); > >> +static bool can_parallelize = false; > >> + > >> +/* This is set only in the process of exit and the set is > >> + * accompanied by a fence. It does not need to be atomic or be > >> + * accessed under a lock. > >> + */ > >> + > >> +static bool workers_must_exit = false; > >> + > >> +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); > >> + > >> +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; > >> + > >> +static int pool_size; > >> + > >> +static int sembase; > >> + > >> +static void worker_pool_hook(void *aux OVS_UNUSED); > >> +static void setup_worker_pools(bool force); > >> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, > >> + void *fin_result, void *result_frags, > >> + int index); > >> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > >> + void *fin_result, void *result_frags, > >> + int index); > >> + > >> +bool ovn_stop_parallel_processing(void) > >> +{ > >> + return workers_must_exit; > >> +} > >> + > >> +bool ovn_can_parallelize_hashes(bool force_parallel) > >> +{ > >> + bool test = false; > >> + > >> + if (atomic_compare_exchange_strong( > >> + &initial_pool_setup, > >> + &test, > >> + true)) { > >> + ovs_mutex_lock(&init_mutex); > >> + setup_worker_pools(force_parallel); > >> + ovs_mutex_unlock(&init_mutex); > >> + } > >> + return can_parallelize; > >> +} > >> + > >> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)){ > >> + > >> + struct worker_pool *new_pool = NULL; > >> + struct worker_control *new_control; > >> + bool test = false; > >> + int i; > >> + char sem_name[256]; > >> + > >> + > >> + /* Belt and braces - initialize the pool system just in case if > >> + * if it is not yet initialized. > >> + */ > >> + > >> + if (atomic_compare_exchange_strong( > >> + &initial_pool_setup, > >> + &test, > >> + true)) { > >> + ovs_mutex_lock(&init_mutex); > >> + setup_worker_pools(false); > >> + ovs_mutex_unlock(&init_mutex); > >> + } > >> + > >> + ovs_mutex_lock(&init_mutex); > >> + if (can_parallelize) { > >> + new_pool = xmalloc(sizeof(struct worker_pool)); > >> + new_pool->size = pool_size; > >> + new_pool->controls = NULL; > >> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); > >> + new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > >> + if (new_pool->done == SEM_FAILED) { > >> + goto cleanup; > >> + } > >> + > >> + new_pool->controls = > >> + xmalloc(sizeof(struct worker_control) * new_pool->size); > >> + > >> + for (i = 0; i < new_pool->size; i++) { > >> + new_control = &new_pool->controls[i]; > >> + new_control->id = i; > >> + new_control->done = new_pool->done; > >> + new_control->data = NULL; > >> + ovs_mutex_init(&new_control->mutex); > >> + new_control->finished = ATOMIC_VAR_INIT(false); > >> + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); > >> + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > >> + if (new_control->fire == SEM_FAILED) { > >> + goto cleanup; > >> + } > >> + } > >> + > >> + for (i = 0; i < pool_size; i++) { > >> + ovs_thread_create("worker pool helper", start, > >> &new_pool->controls[i]); > >> + } > >> + ovs_list_push_back(&worker_pools, &new_pool->list_node); > >> + } > >> + ovs_mutex_unlock(&init_mutex); > >> + return new_pool; > >> +cleanup: > >> + > >> + /* Something went wrong when opening semaphores. In this case > >> + * it is better to shut off parallel procesing altogether > >> + */ > >> + > >> + VLOG_INFO("Failed to initialize parallel processing, error %d", > >> errno); > >> + can_parallelize = false; > >> + if (new_pool->controls) { > >> + for (i = 0; i < new_pool->size; i++) { > >> + if (new_pool->controls[i].fire != SEM_FAILED) { > >> + sem_close(new_pool->controls[i].fire); > >> + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); > >> + sem_unlink(sem_name); > >> + break; /* semaphores past this one are uninitialized */ > >> + } > >> + } > >> + } > >> + if (new_pool->done != SEM_FAILED) { > >> + sem_close(new_pool->done); > >> + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); > >> + sem_unlink(sem_name); > >> + } > >> + ovs_mutex_unlock(&init_mutex); > >> + return NULL; > >> +} > >> + > >> + > >> +/* Initializes 'hmap' as an empty hash table with mask N. */ > >> +void > >> +ovn_fast_hmap_init(struct hmap *hmap, ssize_t mask) > >> +{ > >> + size_t i; > >> + > >> + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); > >> + hmap->one = NULL; > >> + hmap->mask = mask; > >> + hmap->n = 0; > >> + for (i = 0; i <= hmap->mask; i++) { > >> + hmap->buckets[i] = NULL; > >> + } > >> +} > >> + > >> +/* Initializes 'hmap' as an empty hash table of size X. > >> + * Intended for use in parallel processing so that all > >> + * fragments used to store results in a parallel job > >> + * are the same size. > >> + */ > >> +void > >> +ovn_fast_hmap_size_for(struct hmap *hmap, int size) > >> +{ > >> + size_t mask; > >> + mask = size / 2; > >> + mask |= mask >> 1; > >> + mask |= mask >> 2; > >> + mask |= mask >> 4; > >> + mask |= mask >> 8; > >> + mask |= mask >> 16; > >> +#if SIZE_MAX > UINT32_MAX > >> + mask |= mask >> 32; > >> +#endif > >> + > >> + /* If we need to dynamically allocate buckets we might as well > >> allocate at > >> + * least 4 of them. */ > >> + mask |= (mask & 1) << 1; > >> + > >> + fast_hmap_init(hmap, mask); > >> +} > >> + > >> +/* Run a thread pool which uses a callback function to process results > >> + */ > >> + > >> +void ovn_run_pool_callback(struct worker_pool *pool, > >> + void *fin_result, void *result_frags, > >> + void (*helper_func)(struct worker_pool *pool, > >> + void *fin_result, > >> + void *result_frags, int > >> index)) > >> +{ > >> + int index, completed; > >> + > >> + /* Ensure that all worker threads see the same data as the > >> + * main thread. > >> + */ > >> + > >> + atomic_thread_fence(memory_order_acq_rel); > >> + > >> + /* Start workers */ > >> + > >> + for (index = 0; index < pool->size; index++) { > >> + sem_post(pool->controls[index].fire); > >> + } > >> + > >> + completed = 0; > >> + > >> + do { > >> + bool test; > >> + /* Note - we do not loop on semaphore until it reaches > >> + * zero, but on pool size/remaining workers. > >> + * This is by design. If the inner loop can handle > >> + * completion for more than one worker within an iteration > >> + * it will do so to ensure no additional iterations and > >> + * waits once all of them are done. > >> + * > >> + * This may result in us having an initial positive value > >> + * of the semaphore when the pool is invoked the next time. > >> + * This is harmless - the loop will spin up a couple of times > >> + * doing nothing while the workers are processing their data > >> + * slices. > >> + */ > >> + wait_for_work_completion(pool); > >> + for (index = 0; index < pool->size; index++) { > >> + test = true; > >> + /* If the worker has marked its data chunk as complete, > >> + * invoke the helper function to combine the results of > >> + * this worker into the main result. > >> + * > >> + * The worker must invoke an appropriate memory fence > >> + * (most likely acq_rel) to ensure that the main thread > >> + * sees all of the results produced by the worker. > >> + */ > >> + if (atomic_compare_exchange_weak( > >> + &pool->controls[index].finished, > >> + &test, > >> + false)) { > >> + if (helper_func) { > >> + (helper_func)(pool, fin_result, result_frags, index); > >> + } > >> + completed++; > >> + pool->controls[index].data = NULL; > >> + } > >> + } > >> + } while (completed < pool->size); > >> +} > >> + > >> +/* Run a thread pool - basic, does not do results processing. > >> + */ > >> + > >> +void ovn_run_pool(struct worker_pool *pool) > >> +{ > >> + run_pool_callback(pool, NULL, NULL, NULL); > >> +} > >> + > >> +/* Brute force merge of a hashmap into another hashmap. > >> + * Intended for use in parallel processing. The destination > >> + * hashmap MUST be the same size as the one being merged. > >> + * > >> + * This can be achieved by pre-allocating them to correct size > >> + * and using hmap_insert_fast() instead of hmap_insert() > >> + */ > >> + > >> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc) > >> +{ > >> + size_t i; > >> + > >> + ovs_assert(inc->mask == dest->mask); > >> + > >> + if (!inc->n) { > >> + /* Request to merge an empty frag, nothing to do */ > >> + return; > >> + } > >> + > >> + for (i = 0; i <= dest->mask; i++) { > >> + struct hmap_node **dest_bucket = &dest->buckets[i]; > >> + struct hmap_node **inc_bucket = &inc->buckets[i]; > >> + if (*inc_bucket != NULL) { > >> + struct hmap_node *last_node = *inc_bucket; > >> + while (last_node->next != NULL) { > >> + last_node = last_node->next; > >> + } > >> + last_node->next = *dest_bucket; > >> + *dest_bucket = *inc_bucket; > >> + *inc_bucket = NULL; > >> + } > >> + } > >> + dest->n += inc->n; > >> + inc->n = 0; > >> +} > >> + > >> +/* Run a thread pool which gathers results in an array > >> + * of hashes. Merge results. > >> + */ > >> + > >> + > >> +void ovn_run_pool_hash( > >> + struct worker_pool *pool, > >> + struct hmap *result, > >> + struct hmap *result_frags) > >> +{ > >> + run_pool_callback(pool, result, result_frags, merge_hash_results); > >> +} > >> + > >> +/* Run a thread pool which gathers results in an array of lists. > >> + * Merge results. > >> + */ > >> +void ovn_run_pool_list( > >> + struct worker_pool *pool, > >> + struct ovs_list *result, > >> + struct ovs_list *result_frags) > >> +{ > >> + run_pool_callback(pool, result, result_frags, merge_list_results); > >> +} > >> + > >> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks > >> *hrl) > >> +{ > >> + int i; > >> + if (hrl->mask != lflows->mask) { > >> + if (hrl->row_locks) { > >> + free(hrl->row_locks); > >> + } > >> + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + > >> 1); > >> + hrl->mask = lflows->mask; > >> + for (i = 0; i <= lflows->mask; i++) { > >> + ovs_mutex_init(&hrl->row_locks[i]); > >> + } > >> + } > >> +} > >> + > >> +static void worker_pool_hook(void *aux OVS_UNUSED) { > >> + int i; > >> + static struct worker_pool *pool; > >> + char sem_name[256]; > >> + > >> + workers_must_exit = true; > >> + > >> + /* All workers must honour the must_exit flag and check for it > >> regularly. > >> + * We can make it atomic and check it via atomics in workers, but that > >> + * is not really necessary as it is set just once - when the program > >> + * terminates. So we use a fence which is invoked before exiting > >> instead. > >> + */ > >> + atomic_thread_fence(memory_order_acq_rel); > >> + > >> + /* Wake up the workers after the must_exit flag has been set */ > >> + > >> + LIST_FOR_EACH (pool, list_node, &worker_pools) { > >> + for (i = 0; i < pool->size ; i++) { > >> + sem_post(pool->controls[i].fire); > >> + } > >> + for (i = 0; i < pool->size ; i++) { > >> + sem_close(pool->controls[i].fire); > >> + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); > >> + sem_unlink(sem_name); > >> + } > >> + sem_close(pool->done); > >> + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); > >> + sem_unlink(sem_name); > >> + } > >> +} > >> + > >> +static void setup_worker_pools(bool force) { > >> + int cores, nodes; > >> + > >> + nodes = ovs_numa_get_n_numas(); > >> + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { > >> + nodes = 1; > >> + } > >> + cores = ovs_numa_get_n_cores(); > >> + > >> + /* If there is no NUMA config, use 4 cores. > >> + * If there is NUMA config use half the cores on > >> + * one node so that the OS does not start pushing > >> + * threads to other nodes. > >> + */ > >> + if (cores == OVS_CORE_UNSPEC || cores <= 0) { > >> + /* If there is no NUMA we can try the ovs-threads routine. > >> + * It falls back to sysconf and/or affinity mask. > >> + */ > >> + cores = count_cpu_cores(); > >> + pool_size = cores; > >> + } else { > >> + pool_size = cores / nodes; > >> + } > >> + if ((pool_size < 4) && force) { > >> + pool_size = 4; > >> + } > >> + can_parallelize = (pool_size >= 3); > >> + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); > >> + sembase = random_uint32(); > >> +} > >> + > >> +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, > >> + void *fin_result, void *result_frags, > >> + int index) > >> +{ > >> + struct ovs_list *result = (struct ovs_list *)fin_result; > >> + struct ovs_list *res_frags = (struct ovs_list *)result_frags; > >> + > >> + if (!ovs_list_is_empty(&res_frags[index])) { > >> + ovs_list_splice(result->next, > >> + ovs_list_front(&res_frags[index]), &res_frags[index]); > >> + } > >> +} > >> + > >> +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > >> + void *fin_result, void *result_frags, > >> + int index) > >> +{ > >> + struct hmap *result = (struct hmap *)fin_result; > >> + struct hmap *res_frags = (struct hmap *)result_frags; > >> + > >> + fast_hmap_merge(result, &res_frags[index]); > >> + hmap_destroy(&res_frags[index]); > >> +} > >> + > >> +#endif > >> diff --git a/lib/ovn-parallel-hmap.h b/lib/ovn-parallel-hmap.h > >> new file mode 100644 > >> index 000000000..8db61eaba > >> --- /dev/null > >> +++ b/lib/ovn-parallel-hmap.h > >> @@ -0,0 +1,301 @@ > >> +/* > >> + * Copyright (c) 2020 Red Hat, Inc. > >> + * > >> + * Licensed under the Apache License, Version 2.0 (the "License"); > >> + * you may not use this file except in compliance with the License. > >> + * You may obtain a copy of the License at: > >> + * > >> + * http://www.apache.org/licenses/LICENSE-2.0 > >> + * > >> + * Unless required by applicable law or agreed to in writing, software > >> + * distributed under the License is distributed on an "AS IS" BASIS, > >> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > >> implied. > >> + * See the License for the specific language governing permissions and > >> + * limitations under the License. > >> + */ > >> + > >> +#ifndef OVN_PARALLEL_HMAP > >> +#define OVN_PARALLEL_HMAP 1 > >> + > >> +/* if the parallel macros are defined by hmap.h or any other ovs define > >> + * we skip over the ovn specific definitions. > >> + */ > >> + > >> +#ifdef __cplusplus > >> +extern "C" { > >> +#endif > >> + > >> +#include <stdbool.h> > >> +#include <stdlib.h> > >> +#include <semaphore.h> > >> +#include <errno.h> > >> +#include "openvswitch/util.h" > >> +#include "openvswitch/hmap.h" > >> +#include "openvswitch/thread.h" > >> +#include "ovs-atomic.h" > >> + > >> +/* Process this include only if OVS does not supply parallel definitions > >> + */ > >> + > >> +#ifdef OVS_HAS_PARALLEL_HMAP > >> + > >> +#include "parallel-hmap.h" > >> + > >> +#else > >> + > >> + > >> +#ifdef __clang__ > >> +#pragma clang diagnostic push > >> +#pragma clang diagnostic ignored "-Wthread-safety" > >> +#endif > >> + > >> + > >> +/* A version of the HMAP_FOR_EACH macro intended for iterating as part > >> + * of parallel processing. > >> + * Each worker thread has a different ThreadID in the range of > >> 0..POOL_SIZE > >> + * and will iterate hash buckets ThreadID, ThreadID + step, > >> + * ThreadID + step * 2, etc. The actual macro accepts > >> + * ThreadID + step * i as the JOBID parameter. > >> + */ > >> + > >> +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ > >> + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), > >> MEMBER); \ > >> + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ > >> + || ((NODE = NULL), false); \ > >> + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), > >> MEMBER)) > >> + > >> +/* We do not have a SAFE version of the macro, because the hash size is > >> not > >> + * atomic and hash removal operations would need to be wrapped with > >> + * locks. This will defeat most of the benefits from doing anything in > >> + * parallel. > >> + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, > >> + * each thread should store them in a temporary list result instead, > >> merging > >> + * the lists into a combined result at the end */ > >> + > >> +/* Work "Handle" */ > >> + > >> +struct worker_control { > >> + int id; /* Used as a modulo when iterating over a hash. */ > >> + atomic_bool finished; /* Set to true after achunk of work is > >> complete. */ > >> + sem_t *fire; /* Work start semaphore - sem_post starts the worker. */ > >> + sem_t *done; /* Work completion semaphore - sem_post on completion. */ > >> + struct ovs_mutex mutex; /* Guards the data. */ > >> + void *data; /* Pointer to data to be processed. */ > >> + void *workload; /* back-pointer to the worker pool structure. */ > >> +}; > >> + > >> +struct worker_pool { > >> + int size; /* Number of threads in the pool. */ > >> + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ > >> + struct worker_control *controls; /* "Handles" in this pool. */ > >> + sem_t *done; /* Work completion semaphorew. */ > >> +}; > >> + > >> +/* Add a worker pool for thread function start() which expects a pointer > >> to > >> + * a worker_control structure as an argument. */ > >> + > >> +struct worker_pool *ovn_add_worker_pool(void *(*start)(void *)); > >> + > >> +/* Setting this to true will make all processing threads exit */ > >> + > >> +bool ovn_stop_parallel_processing(void); > >> + > >> +/* Build a hmap pre-sized for size elements */ > >> + > >> +void ovn_fast_hmap_size_for(struct hmap *hmap, int size); > >> + > >> +/* Build a hmap with a mask equals to size */ > >> + > >> +void ovn_fast_hmap_init(struct hmap *hmap, ssize_t size); > >> + > >> +/* Brute-force merge a hmap into hmap. > >> + * Dest and inc have to have the same mask. The merge is performed > >> + * by extending the element list for bucket N in the dest hmap with the > >> list > >> + * from bucket N in inc. > >> + */ > >> + > >> +void ovn_fast_hmap_merge(struct hmap *dest, struct hmap *inc); > >> + > >> +/* Run a pool, without any default processing of results. > >> + */ > >> + > >> +void ovn_run_pool(struct worker_pool *pool); > >> + > >> +/* Run a pool, merge results from hash frags into a final hash result. > >> + * The hash frags must be pre-sized to the same size. > >> + */ > >> + > >> +void ovn_run_pool_hash(struct worker_pool *pool, > >> + struct hmap *result, struct hmap *result_frags); > >> +/* Run a pool, merge results from list frags into a final list result. > >> + */ > >> + > >> +void ovn_run_pool_list(struct worker_pool *pool, > >> + struct ovs_list *result, struct ovs_list > >> *result_frags); > >> + > >> +/* Run a pool, call a callback function to perform processing of results. > >> + */ > >> + > >> +void ovn_run_pool_callback(struct worker_pool *pool, void *fin_result, > >> + void *result_frags, > >> + void (*helper_func)(struct worker_pool *pool, > >> + void *fin_result, void *result_frags, int index)); > >> + > >> + > >> +/* Returns the first node in 'hmap' in the bucket in which the given > >> 'hash' > >> + * would land, or a null pointer if that bucket is empty. */ > >> + > >> +static inline struct hmap_node * > >> +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) > >> +{ > >> + return hmap->buckets[num]; > >> +} > >> + > >> +static inline struct hmap_node * > >> +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t > >> pool_size) > >> +{ > >> + size_t i; > >> + for (i = start; i <= hmap->mask; i+= pool_size) { > >> + struct hmap_node *node = hmap->buckets[i]; > >> + if (node) { > >> + return node; > >> + } > >> + } > >> + return NULL; > >> +} > >> + > >> +/* Returns the first node in 'hmap', as expected by thread with job_id > >> + * for parallel processing in arbitrary order, or a null pointer if > >> + * the slice of 'hmap' for that job_id is empty. */ > >> +static inline struct hmap_node * > >> +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t > >> pool_size) > >> +{ > >> + return parallel_hmap_next__(hmap, job_id, pool_size); > >> +} > >> + > >> +/* Returns the next node in the slice of 'hmap' following 'node', > >> + * in arbitrary order, or a * null pointer if 'node' is the last node in > >> + * the 'hmap' slice. > >> + * > >> + */ > >> +static inline struct hmap_node * > >> +parallel_hmap_next(const struct hmap *hmap, > >> + const struct hmap_node *node, ssize_t pool_size) > >> +{ > >> + return (node->next > >> + ? node->next > >> + : parallel_hmap_next__(hmap, > >> + (node->hash & hmap->mask) + pool_size, pool_size)); > >> +} > >> + > >> +static inline void post_completed_work(struct worker_control *control) > >> +{ > >> + atomic_thread_fence(memory_order_acq_rel); > >> + atomic_store_relaxed(&control->finished, true); > >> + sem_post(control->done); > >> +} > >> + > >> +static inline void wait_for_work(struct worker_control *control) > >> +{ > >> + int ret; > >> + > >> + do { > >> + ret = sem_wait(control->fire); > >> + } while ((ret == -1) && (errno == EINTR)); > >> + ovs_assert(ret == 0); > >> +} > >> +static inline void wait_for_work_completion(struct worker_pool *pool) > >> +{ > >> + int ret; > >> + > >> + do { > >> + ret = sem_wait(pool->done); > >> + } while ((ret == -1) && (errno == EINTR)); > >> + ovs_assert(ret == 0); > >> +} > >> + > >> + > >> +/* Hash per-row locking support - to be used only in conjunction > >> + * with fast hash inserts. Normal hash inserts may resize the hash > >> + * rendering the locking invalid. > >> + */ > >> + > >> +struct hashrow_locks { > >> + ssize_t mask; > >> + struct ovs_mutex *row_locks; > >> +}; > >> + > >> +/* Update an hash row locks structure to match the current hash size */ > >> + > >> +void ovn_update_hashrow_locks(struct hmap *lflows, struct hashrow_locks > >> *hrl); > >> + > >> +/* Lock a hash row */ > >> + > >> +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash) > >> +{ > >> + ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]); > >> +} > >> + > >> +/* Unlock a hash row */ > >> + > >> +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t > >> hash) > >> +{ > >> + ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]); > >> +} > >> +/* Init the row locks structure */ > >> + > >> +static inline void init_hash_row_locks(struct hashrow_locks *hrl) > >> +{ > >> + hrl->mask = 0; > >> + hrl->row_locks = NULL; > >> +} > >> + > >> +bool ovn_can_parallelize_hashes(bool force_parallel); > >> + > >> +/* Use the OVN library functions for stuff which OVS has not defined > >> + * If OVS has defined these, they will still compile using the OVN > >> + * local names, but will be dropped by the linker in favour of the OVS > >> + * supplied functions. > >> + */ > >> + > >> +#define update_hashrow_locks(lflows, hrl) > >> ovn_update_hashrow_locks(lflows, hrl) > >> + > >> +#define can_parallelize_hashes(force) ovn_can_parallelize_hashes(force) > >> + > >> +#define stop_parallel_processing() ovn_stop_parallel_processing() > >> + > >> +#define add_worker_pool(start) ovn_add_worker_pool(start) > >> + > >> +#define fast_hmap_size_for(hmap, size) ovn_fast_hmap_size_for(hmap, size) > >> + > >> +#define fast_hmap_init(hmap, size) ovn_fast_hmap_init(hmap, size) > >> + > >> +#define fast_hmap_merge(dest, inc) ovn_fast_hmap_merge(dest, inc) > >> + > >> +#define hmap_merge(dest, inc) ovn_hmap_merge(dest, inc) > >> + > >> +#define ovn_run_pool(pool) ovn_run_pool(pool) > >> + > >> +#define run_pool_hash(pool, result, result_frags) \ > >> + ovn_run_pool_hash(pool, result, result_frags) > >> + > >> +#define run_pool_list(pool, result, result_frags) \ > >> + ovn_run_pool_list(pool, result, result_frags) > >> + > >> +#define run_pool_callback(pool, fin_result, result_frags, helper_func) \ > >> + ovn_run_pool_callback(pool, fin_result, result_frags, helper_func) > >> + > >> + > >> + > >> +#ifdef __clang__ > >> +#pragma clang diagnostic pop > >> +#endif > >> + > >> +#endif > >> + > >> +#ifdef __cplusplus > >> +} > >> +#endif > >> + > >> + > >> +#endif /* lib/fasthmap.h */ > >> -- > >> 2.20.1 > >> > >> _______________________________________________ > >> dev mailing list > >> [email protected] > >> https://mail.openvswitch.org/mailman/listinfo/ovs-dev > >> > > > > -- > Anton R. Ivanov > Cambridgegreys Limited. Registered in England. Company Number 10273661 > https://www.cambridgegreys.com/ > _______________________________________________ > dev mailing list > [email protected] > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
