On Fri, Jan 29, 2021 at 7:08 PM <[email protected]> wrote: > > From: Anton Ivanov <[email protected]> > > Adds functionality needed to walk a hash in parallel where thread ID > N out of a pool sized M is responsible for processing all elements > in buckets N, N+M, N+M*2, etc > > Signed-off-by: Anton Ivanov <[email protected]>
Hi Anton, Please see below for a few comments. Thanks Numan > --- > include/openvswitch/hmap.h | 2 + > lib/automake.mk | 2 + > lib/parallel-hmap.c | 410 +++++++++++++++++++++++++++++++++++++ > lib/parallel-hmap.h | 238 +++++++++++++++++++++ > 4 files changed, 652 insertions(+) > create mode 100644 lib/parallel-hmap.c > create mode 100644 lib/parallel-hmap.h > > diff --git a/include/openvswitch/hmap.h b/include/openvswitch/hmap.h > index 4e001cc69..6aed568e1 100644 > --- a/include/openvswitch/hmap.h > +++ b/include/openvswitch/hmap.h > @@ -17,6 +17,8 @@ > #ifndef HMAP_H > #define HMAP_H 1 > > +#define OVS_HAS_PARALLEL_HMAP 1 > + > #include <stdbool.h> > #include <stdlib.h> > #include "openvswitch/util.h" > diff --git a/lib/automake.mk b/lib/automake.mk > index 39afbff9d..99f7bb8d4 100644 > --- a/lib/automake.mk > +++ b/lib/automake.mk > @@ -122,6 +122,8 @@ lib_libopenvswitch_la_SOURCES = \ > lib/dynamic-string.c \ > lib/entropy.c \ > lib/entropy.h \ > + lib/parallel-hmap.h \ > + lib/parallel-hmap.c \ > lib/fat-rwlock.c \ > lib/fat-rwlock.h \ > lib/fatal-signal.c \ > diff --git a/lib/parallel-hmap.c b/lib/parallel-hmap.c > new file mode 100644 > index 000000000..a923a6142 > --- /dev/null > +++ b/lib/parallel-hmap.c > @@ -0,0 +1,410 @@ > +/* > + * Copyright (c) 2020 Red Hat, Inc. > + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#include <config.h> > +#include <stdint.h> > +#include <string.h> > +#include <stdlib.h> > +#include <unistd.h> > +#include <semaphore.h> > +#include <fcntl.h> > +#include "fatal-signal.h" > +#include "util.h" > +#include "openvswitch/vlog.h" > +#include "openvswitch/hmap.h" > +#include "openvswitch/thread.h" > +#include "hmapx.h" > +#include "ovs-atomic.h" > +#include "ovs-thread.h" > +#include "ovs-numa.h" > +#include "random.h" > +#include "parallel-hmap.h" > + > +VLOG_DEFINE_THIS_MODULE(parallel_hmap); > + > +#define WORKER_SEM_NAME "%x-%p-%x" > +#define MAIN_SEM_NAME "%x-%p-main" > + > + > +/* These are accessed under mutex inside add_worker_pool(). > + * They do not need to be atomic. > + */ > + > +static atomic_bool initial_pool_setup = ATOMIC_VAR_INIT(false); > +static bool can_parallelize = false; > + > +/* This is set only in the process of exit and the set is > + * accompanied by a fence. It does not need to be atomic or be > + * accessed under a lock. > + */ > + > +static bool workers_must_exit = false; > + > +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); > + > +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; > + > +static int pool_size; > + > +static int sembase; > + > +static void worker_pool_hook(void *aux OVS_UNUSED) { > + int i; > + static struct worker_pool *pool; > + char sem_name[256]; > + > + workers_must_exit = true; > + > + /* All workers must honour the must_exit flag and check for it regularly. > + * We can make it atomic and check it via atomics in workers, but that > + * is not really necessary as it is set just once - when the program > + * terminates. So we use a fence which is invoked before exiting instead. > + */ > + atomic_thread_fence(memory_order_acq_rel); > + > + /* Wake up the workers after the must_exit flag has been set */ > + > + LIST_FOR_EACH (pool, list_node, &worker_pools) { > + for (i = 0; i < pool->size ; i++) { > + sem_post(pool->controls[i].fire); > + } > + for (i = 0; i < pool->size ; i++) { > + sem_close(pool->controls[i].fire); > + sprintf(sem_name, WORKER_SEM_NAME, sembase, pool, i); > + sem_unlink(sem_name); > + } > + sem_close(pool->done); > + sprintf(sem_name, MAIN_SEM_NAME, sembase, pool); > + sem_unlink(sem_name); > + } > +} > + > +static void setup_worker_pools(void) { > + int cores, nodes; > + > + nodes = ovs_numa_get_n_numas(); > + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { > + nodes = 1; > + } > + cores = ovs_numa_get_n_cores(); > + > + /* If there is no NUMA config, use 4 cores. > + * If there is NUMA config use half the cores on > + * one node so that the OS does not start pushing > + * threads to other nodes. > + */ > + if (cores == OVS_CORE_UNSPEC || cores <= 0) { > + /* If there is no NUMA we can try the ovs-threads routine. > + * It falls back to sysconf and/or affinity mask. > + */ > + cores = count_cpu_cores(); > + pool_size = cores; > + } else { > + pool_size = cores / nodes; > + } > + if (pool_size > 16) { > + pool_size = 16; > + } > + can_parallelize = (pool_size >= 3); > + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); > + sembase = random_uint32(); > +} > + > +bool stop_parallel_processing(void) > +{ > + return workers_must_exit; > +} > + > +bool can_parallelize_hashes(void) > +{ > + bool test = false; > + > + if (atomic_compare_exchange_strong( > + &initial_pool_setup, > + &test, > + true)) { > + ovs_mutex_lock(&init_mutex); > + setup_worker_pools(); > + ovs_mutex_unlock(&init_mutex); > + } > + return can_parallelize; > +} Can you please define the public functions first and then the static functions. That is the recommended way in the coding guidelines - https://github.com/ovn-org/ovn/blob/master/Documentation/internals/contributing/coding-style.rst (section Functions) I would suggest having a function - ovn_fast_processing_init(bool force_parallelize) which does the job of 'can_parallelize_hashes()'. If force_parallelize is true, then the function would set can_parallelize to true and set the pool size to a default value (may be 4) Feel free to change the function name to a more appropriate one if you prefer. A user of the library can then call can_parallelize_hashes() to know if parallelization can be enabled or not. I would also suggest to prefix the name of the public functions with ovn_fastp to indicate that they are related to hmap parallelization. Like add_worker_pool() can be ovn_fastp_setup_worker_pool. > + > +struct worker_pool *add_worker_pool(void *(*start)(void *)){ > + > + struct worker_pool *new_pool = NULL; > + struct worker_control *new_control; > + bool test = false; > + int i; > + char sem_name[256]; > + > + > + if (atomic_compare_exchange_strong( > + &initial_pool_setup, > + &test, > + true)) { > + ovs_mutex_lock(&init_mutex); > + setup_worker_pools(); > + ovs_mutex_unlock(&init_mutex); > + } > + > + ovs_mutex_lock(&init_mutex); > + if (can_parallelize) { > + new_pool = xmalloc(sizeof(struct worker_pool)); > + new_pool->size = pool_size; > + sprintf(sem_name, MAIN_SEM_NAME, sembase, new_pool); > + new_pool->done = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > + > + ovs_list_push_back(&worker_pools, &new_pool->list_node); > + > + new_pool->controls = > + xmalloc(sizeof(struct worker_control) * new_pool->size); > + > + for (i = 0; i < new_pool->size; i++) { > + new_control = &new_pool->controls[i]; > + sprintf(sem_name, WORKER_SEM_NAME, sembase, new_pool, i); > + new_control->fire = sem_open(sem_name, O_CREAT, S_IRWXU, 0); > + new_control->id = i; > + new_control->done = new_pool->done; > + new_control->data = NULL; > + ovs_mutex_init(&new_control->mutex); > + new_control->finished = ATOMIC_VAR_INIT(false); > + } > + > + for (i = 0; i < pool_size; i++) { > + ovs_thread_create("worker pool helper", start, > &new_pool->controls[i]); > + } > + } > + ovs_mutex_unlock(&init_mutex); > + return new_pool; > +} > + > + > +/* Initializes 'hmap' as an empty hash table with mask N. */ > +void > +fast_hmap_init(struct hmap *hmap, ssize_t mask) > +{ > + size_t i; > + > + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); > + hmap->one = NULL; > + hmap->mask = mask; > + hmap->n = 0; > + for (i = 0; i <= hmap->mask; i++) { > + hmap->buckets[i] = NULL; > + } > +} > + > +/* Initializes 'hmap' as an empty hash table of size X. > + * Intended for use in parallel processing so that all > + * fragments used to store results in a parallel job > + * are the same size. > + */ > +void > +fast_hmap_size_for(struct hmap *hmap, int size) > +{ > + size_t mask; > + mask = size / 2; > + mask |= mask >> 1; > + mask |= mask >> 2; > + mask |= mask >> 4; > + mask |= mask >> 8; > + mask |= mask >> 16; > +#if SIZE_MAX > UINT32_MAX > + mask |= mask >> 32; > +#endif > + > + /* If we need to dynamically allocate buckets we might as well allocate > at > + * least 4 of them. */ > + mask |= (mask & 1) << 1; > + > + fast_hmap_init(hmap, mask); > +} > + > +/* Run a thread pool which uses a callback function to process results > + */ > + > +void run_pool_callback(struct worker_pool *pool, > + void *fin_result, void *result_frags, > + void (*helper_func)(struct worker_pool *pool, > + void *fin_result, > + void *result_frags, int index)) > +{ > + int index, completed; > + > + /* Ensure that all worker threads see the same data as the > + * main thread. > + */ > + > + atomic_thread_fence(memory_order_acq_rel); > + > + /* Start workers */ > + > + for (index = 0; index < pool->size; index++) { > + sem_post(pool->controls[index].fire); > + } > + > + completed = 0; > + > + do { > + bool test; > + /* Note - we do not loop on semaphore until it reaches > + * zero, but on pool size/remaining workers. > + * This is by design. If the inner loop can handle > + * completion for more than one worker within an iteration > + * it will do so to ensure no additional iterations and > + * waits once all of them are done. > + * > + * This may result in us having an initial positive value > + * of the semaphore when the pool is invoked the next time. > + * This is harmless - the loop will spin up a couple of times > + * doing nothing while the workers are processing their data > + * slices. > + */ > + sem_wait(pool->done); > + for (index = 0; index < pool->size; index++) { > + test = true; > + /* If the worker has marked its data chunk as complete, > + * invoke the helper function to combine the results of > + * this worker into the main result. > + * > + * The worker must invoke an appropriate memory fence > + * (most likely acq_rel) to ensure that the main thread > + * sees all of the results produced by the worker. > + */ > + if (atomic_compare_exchange_weak( > + &pool->controls[index].finished, > + &test, > + false)) { > + if (helper_func) { > + (helper_func)(pool, fin_result, result_frags, index); > + } > + completed++; > + pool->controls[index].data = NULL; > + } > + } > + } while (completed < pool->size); > +} > + > +/* Run a thread pool - basic, does not do results processing. > + */ > + > +void run_pool(struct worker_pool *pool) > +{ > + run_pool_callback(pool, NULL, NULL, NULL); > +} > + > +/* Brute force merge of a hashmap into another hashmap. > + * Intended for use in parallel processing. The destination > + * hashmap MUST be the same size as the one being merged. > + * > + * This can be achieved by pre-allocating them to correct size > + * and using hmap_insert_fast() instead of hmap_insert() > + */ > + > +void fast_hmap_merge(struct hmap *dest, struct hmap *inc) > +{ > + size_t i; > + > + ovs_assert(inc->mask == dest->mask); > + > + if (!inc->n) { > + /* Request to merge an empty frag, nothing to do */ > + return; > + } > + > + for (i = 0; i <= dest->mask; i++) { > + struct hmap_node **dest_bucket = &dest->buckets[i]; > + struct hmap_node **inc_bucket = &inc->buckets[i]; > + if (*inc_bucket != NULL) { > + struct hmap_node *last_node = *inc_bucket; > + while (last_node->next != NULL) { > + last_node = last_node->next; > + } > + last_node->next = *dest_bucket; > + *dest_bucket = *inc_bucket; > + *inc_bucket = NULL; > + } > + } > + dest->n += inc->n; > + inc->n = 0; > +} > + > +/* Run a thread pool which gathers results in an array > + * of hashes. Merge results. > + */ > + > +static void merge_hash_results(struct worker_pool *pool OVS_UNUSED, > + void *fin_result, void *result_frags, > + int index) > +{ > + struct hmap *result = (struct hmap *)fin_result; > + struct hmap *res_frags = (struct hmap *)result_frags; > + > + fast_hmap_merge(result, &res_frags[index]); > + hmap_destroy(&res_frags[index]); > +} > + > + > +void run_pool_hash( > + struct worker_pool *pool, > + struct hmap *result, > + struct hmap *result_frags) > +{ > + run_pool_callback(pool, result, result_frags, merge_hash_results); > +} > + > +/* Run a thread pool which gathers results in an array of lists. > + * Merge results. > + */ > + > +static void merge_list_results(struct worker_pool *pool OVS_UNUSED, > + void *fin_result, void *result_frags, > + int index) > +{ > + struct ovs_list *result = (struct ovs_list *)fin_result; > + struct ovs_list *res_frags = (struct ovs_list *)result_frags; > + > + if (!ovs_list_is_empty(&res_frags[index])) { > + ovs_list_splice(result->next, > + ovs_list_front(&res_frags[index]), &res_frags[index]); > + } > +} > + > + > +void run_pool_list( > + struct worker_pool *pool, > + struct ovs_list *result, > + struct ovs_list *result_frags) > +{ > + run_pool_callback(pool, result, result_frags, merge_list_results); > +} > + > +void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl) > +{ > + int i; > + if (hrl->mask != lflows->mask) { > + if (hrl->row_locks) { > + free(hrl->row_locks); > + } > + hrl->row_locks = xcalloc(sizeof(struct ovs_mutex), lflows->mask + 1); > + hrl->mask = lflows->mask; > + for (i = 0; i <= lflows->mask; i++) { > + ovs_mutex_init(&hrl->row_locks[i]); > + } > + } > +} > diff --git a/lib/parallel-hmap.h b/lib/parallel-hmap.h > new file mode 100644 > index 000000000..771bf583a > --- /dev/null > +++ b/lib/parallel-hmap.h > @@ -0,0 +1,238 @@ > +/* > + * Copyright (c) 2020 Red Hat, Inc. > + * > + * Licensed under the Apache License, Version 2.0 (the "License"); > + * you may not use this file except in compliance with the License. > + * You may obtain a copy of the License at: > + * > + * http://www.apache.org/licenses/LICENSE-2.0 > + * > + * Unless required by applicable law or agreed to in writing, software > + * distributed under the License is distributed on an "AS IS" BASIS, > + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > + * See the License for the specific language governing permissions and > + * limitations under the License. > + */ > + > +#ifndef OVS_PARALLEL_HMAP > +#define OVS_PARALLEL_HMAP 1 > + > +/* if the parallel macros are defined by hmap.h or any other ovs define > + * we skip over the ovn specific definitions. > + */ > + > +#ifdef __cplusplus > +extern "C" { > +#endif > + > +#include <stdbool.h> > +#include <stdlib.h> > +#include <semaphore.h> > +#include "openvswitch/util.h" > +#include "openvswitch/hmap.h" > +#include "openvswitch/thread.h" > +#include "ovs-atomic.h" > + > +#ifdef __clang__ > +#pragma clang diagnostic push > +#pragma clang diagnostic ignored "-Wthread-safety" > +#endif Why is this required ? I see that if I comment I see compilation errors with clang. I would suggest adding some comments on why it is required if it is required. > +/* A version of the HMAP_FOR_EACH macro intended for iterating as part > + * of parallel processing. > + * Each worker thread has a different ThreadID in the range of 0..POOL_SIZE > + * and will iterate hash buckets ThreadID, ThreadID + step, > + * ThreadID + step * 2, etc. The actual macro accepts > + * ThreadID + step * i as the JOBID parameter. > + */ > + > +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ > + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); > \ > + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ > + || ((NODE = NULL), false); \ > + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) > + > +/* We do not have a SAFE version of the macro, because the hash size is not > + * atomic and hash removal operations would need to be wrapped with > + * locks. This will defeat most of the benefits from doing anything in > + * parallel. > + * If the code block inside FOR_EACH_IN_PARALLEL needs to remove elements, > + * each thread should store them in a temporary list result instead, merging > + * the lists into a combined result at the end */ > + > +/* Work "Handle" */ > + > +struct worker_control { > + int id; /* Used as a modulo when iterating over a hash. */ > + atomic_bool finished; /* Set to true after achunk of work is complete. */ > + sem_t *fire; /* Work start semaphore - sem_post starts the worker. */ > + sem_t *done; /* Work completion semaphore - sem_post on completion. */ > + struct ovs_mutex mutex; /* Guards the data. */ > + void *data; /* Pointer to data to be processed. */ > + void *workload; /* back-pointer to the worker pool structure. */ > +}; > + > +struct worker_pool { > + int size; /* Number of threads in the pool. */ > + struct ovs_list list_node; /* List of pools - used in cleanup/exit. */ > + struct worker_control *controls; /* "Handles" in this pool. */ > + sem_t *done; /* Work completion semaphorew. */ > +}; > + > +/* Add a worker pool for thread function start() which expects a pointer to > + * a worker_control structure as an argument. */ > + > +struct worker_pool *add_worker_pool(void *(*start)(void *)); > + > +/* Setting this to true will make all processing threads exit */ > + > +bool stop_parallel_processing(void); > + > +/* Build a hmap pre-sized for size elements */ > + > +void fast_hmap_size_for(struct hmap *hmap, int size); > + > +/* Build a hmap with a mask equals to size */ > + > +void fast_hmap_init(struct hmap *hmap, ssize_t size); > + > +/* Brute-force merge a hmap into hmap. > + * Dest and inc have to have the same mask. The merge is performed > + * by extending the element list for bucket N in the dest hmap with the list > + * from bucket N in inc. > + */ > + > +void fast_hmap_merge(struct hmap *dest, struct hmap *inc); > + > +/* Run a pool, without any default processing of results. > + */ > + > +void run_pool(struct worker_pool *pool); > + > +/* Run a pool, merge results from hash frags into a final hash result. > + * The hash frags must be pre-sized to the same size. > + */ > + > +void run_pool_hash(struct worker_pool *pool, > + struct hmap *result, struct hmap *result_frags); > + > +/* Run a pool, merge results from list frags into a final list result. > + */ > + > +void run_pool_list(struct worker_pool *pool, > + struct ovs_list *result, struct ovs_list *result_frags); > + > +/* Run a pool, call a callback function to perform processing of results. > + */ > + > +void run_pool_callback(struct worker_pool *pool, void *fin_result, > + void *result_frags, > + void (*helper_func)(struct worker_pool *pool, > + void *fin_result, void > *result_frags, > + int index)); > + > + > +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' > + * would land, or a null pointer if that bucket is empty. */ > + > +static inline struct hmap_node * > +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) > +{ > + return hmap->buckets[num]; > +} > + > +static inline struct hmap_node * > +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) > +{ > + size_t i; > + for (i = start; i <= hmap->mask; i+= pool_size) { > + struct hmap_node *node = hmap->buckets[i]; > + if (node) { > + return node; > + } > + } > + return NULL; > +} > + > +/* Returns the first node in 'hmap', as expected by thread with job_id > + * for parallel processing in arbitrary order, or a null pointer if > + * the slice of 'hmap' for that job_id is empty. */ > +static inline struct hmap_node * > +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) > +{ > + return parallel_hmap_next__(hmap, job_id, pool_size); > +} > + > +/* Returns the next node in the slice of 'hmap' following 'node', > + * in arbitrary order, or a * null pointer if 'node' is the last node in > + * the 'hmap' slice. > + * > + */ > +static inline struct hmap_node * > +parallel_hmap_next(const struct hmap *hmap, > + const struct hmap_node *node, ssize_t pool_size) > +{ > + return (node->next > + ? node->next > + : parallel_hmap_next__(hmap, > + (node->hash & hmap->mask) + pool_size, pool_size)); > +} > + > +static inline void post_completed_work(struct worker_control *control) > +{ > + atomic_thread_fence(memory_order_acq_rel); > + atomic_store_relaxed(&control->finished, true); > + sem_post(control->done); > +} > + > +static inline void wait_for_work(struct worker_control *control) > +{ > + sem_wait(control->fire); > +} > + > +/* Hash per-row locking support - to be used only in conjunction > + * with fast hash inserts. Normal hash inserts may resize the hash > + * rendering the locking invalid. > + */ > + > +struct hashrow_locks { > + ssize_t mask; > + struct ovs_mutex *row_locks; > +}; > + > +/* Update an hash row locks structure to match the current hash size */ > + > +void update_hashrow_locks(struct hmap *lflows, struct hashrow_locks *hrl); > + > +/* Lock a hash row */ > + > +static inline void lock_hash_row(struct hashrow_locks *hrl, uint32_t hash) > +{ > + ovs_mutex_lock(&hrl->row_locks[hash % hrl->mask]); > +} > + > +/* Unlock a hash row */ > + > +static inline void unlock_hash_row(struct hashrow_locks *hrl, uint32_t hash) > +{ > + ovs_mutex_unlock(&hrl->row_locks[hash % hrl->mask]); > +} > +/* Init the row locks structure */ > + > +static inline void init_hash_row_locks(struct hashrow_locks *hrl) > +{ > + hrl->mask = 0; > + hrl->row_locks = NULL; > +} > + > +bool can_parallelize_hashes(void); > + > +#ifdef __clang__ > +#pragma clang diagnostic pop > +#endif > + > +#ifdef __cplusplus > +} > +#endif > + > + > +#endif /* lib/ovs-fasthmap.h */ > -- > 2.20.1 > > _______________________________________________ > dev mailing list > [email protected] > https://mail.openvswitch.org/mailman/listinfo/ovs-dev > _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
