From: Anton Ivanov <[email protected]> Adds functionality needed to walk a hash in parallel where thread ID N out of a pool sized M is responcible for processing all elements in buckets N, N+M, N+M*2, etc
Signed-off-by: Anton Ivanov <[email protected]> --- lib/automake.mk | 2 + lib/fasthmap.c | 367 ++++++++++++++++++++++++++++++++++++++++++++++++ lib/fasthmap.h | 146 +++++++++++++++++++ 3 files changed, 515 insertions(+) create mode 100644 lib/fasthmap.c create mode 100644 lib/fasthmap.h diff --git a/lib/automake.mk b/lib/automake.mk index 86940ccd2..dc1f8c29e 100644 --- a/lib/automake.mk +++ b/lib/automake.mk @@ -95,6 +95,8 @@ lib_libopenvswitch_la_SOURCES = \ lib/dynamic-string.c \ lib/entropy.c \ lib/entropy.h \ + lib/fasthmap.h \ + lib/fasthmap.c \ lib/fat-rwlock.c \ lib/fat-rwlock.h \ lib/fatal-signal.c \ diff --git a/lib/fasthmap.c b/lib/fasthmap.c new file mode 100644 index 000000000..23f6e3cb3 --- /dev/null +++ b/lib/fasthmap.c @@ -0,0 +1,367 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * Copyright (c) 2008, 2009, 2010, 2012, 2013, 2015, 2019 Nicira, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include <config.h> +#include <stdint.h> +#include <string.h> +#include <semaphore.h> +#include "fatal-signal.h" +#include "util.h" +#include "openvswitch/vlog.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "fasthmap.h" +#include "ovs-atomic.h" +#include "ovs-thread.h" +#include "ovs-numa.h" + +VLOG_DEFINE_THIS_MODULE(fasthmap); + + +static bool worker_pool_setup = false; +static bool workers_must_exit = false; + +static struct ovs_list worker_pools = OVS_LIST_INITIALIZER(&worker_pools); + +static struct ovs_mutex init_mutex = OVS_MUTEX_INITIALIZER; + +static int pool_size; + +static void worker_pool_hook(void *aux OVS_UNUSED) { + int i; + static struct worker_pool *pool; + workers_must_exit = true; /* all workers must honour this flag */ + atomic_thread_fence(memory_order_release); + LIST_FOR_EACH (pool, list_node, &worker_pools) { + for (i = 0; i < pool->size ; i++) { + sem_post(&pool->controls[i].fire); + } + } +} + +static void setup_worker_pools(void) { + int cores, nodes; + + nodes = ovs_numa_get_n_numas(); + if (nodes == OVS_NUMA_UNSPEC || nodes <= 0) { + nodes = 1; + } + cores = ovs_numa_get_n_cores(); + if (cores == OVS_CORE_UNSPEC || cores <= 0) { + pool_size = 4; + } else { + pool_size = cores / nodes; + } + fatal_signal_add_hook(worker_pool_hook, NULL, NULL, true); + worker_pool_setup = true; +} + +bool seize_fire(void) +{ + return workers_must_exit; +} + +struct worker_pool *add_worker_pool(void *(*start)(void *)){ + + struct worker_pool *new_pool = NULL; + struct worker_control *new_control; + int i; + + ovs_mutex_lock(&init_mutex); + + if (!worker_pool_setup) { + setup_worker_pools(); + } + + new_pool = xmalloc(sizeof(struct worker_pool)); + new_pool->size = pool_size; + sem_init(&new_pool->done, 0, 0); + + ovs_list_push_back(&worker_pools, &new_pool->list_node); + + new_pool->controls = + xmalloc(sizeof(struct worker_control) * new_pool->size); + + for (i = 0; i < new_pool->size; i++) { + new_control = &new_pool->controls[i]; + sem_init(&new_control->fire, 0, 0); + new_control->id = i; + new_control->done = &new_pool->done; + new_control->data = NULL; + ovs_mutex_init(&new_control->mutex); + new_control->finished = ATOMIC_VAR_INIT(false); + } + + for (i = 0; i < pool_size; i++) { + ovs_thread_create("worker pool helper", start, &new_pool->controls[i]); + } + ovs_mutex_unlock(&init_mutex); + return new_pool; +} + + +/* Initializes 'hmap' as an empty hash table with mask N. */ +void +fast_hmap_init(struct hmap *hmap, ssize_t mask) +{ + size_t i; + + hmap->buckets = xmalloc(sizeof (struct hmap_node *) * (mask + 1)); + hmap->one = NULL; + hmap->mask = mask; + hmap->n = 0; + for (i = 0; i <= hmap->mask; i++) { + hmap->buckets[i] = NULL; + } +} + +/* Initializes 'hmap' as an empty hash table of size X. + * Intended for use in parallel processing so that all + * fragments used to store results in a parallel job + * are the same size. + */ +void +fast_hmap_size_for(struct hmap *hmap, int size) +{ + size_t mask; + mask = size / 2; + mask |= mask >> 1; + mask |= mask >> 2; + mask |= mask >> 4; + mask |= mask >> 8; + mask |= mask >> 16; +#if SIZE_MAX > UINT32_MAX + mask |= mask >> 32; +#endif + + /* If we need to dynamically allocate buckets we might as well allocate at + * least 4 of them. */ + mask |= (mask & 1) << 1; + + fast_hmap_init(hmap, mask); +} + +/* Run a thread pool - basic, does not do results processing. + */ + +void run_pool(struct worker_pool *pool) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Brute force merge of a hashmap into another hashmap. + * Intended for use in parallel processing. The destination + * hashmap MUST be the same size as the one being merged. + * + * This can be achieved by pre-allocating them to correct size + * and using hmap_insert_fast() instead of hmap_insert() + */ + +void fast_hmap_merge(struct hmap *dest, struct hmap *inc) +{ + size_t i; + + ovs_assert(inc->mask == dest->mask); + + if (!inc->n) { + /* Request to merge an empty frag, nothing to do */ + return; + } + + for (i = 0; i <= dest->mask; i++) { + struct hmap_node **dest_bucket = &dest->buckets[i]; + struct hmap_node **inc_bucket = &inc->buckets[i]; + if (*inc_bucket != NULL) { + struct hmap_node *last_node = *inc_bucket; + while (last_node->next != NULL) { + last_node = last_node->next; + } + last_node->next = *dest_bucket; + *dest_bucket = *inc_bucket; + *inc_bucket = NULL; + } + } + dest->n += inc->n; + inc->n = 0; +} + +/* Run a thread pool which gathers results in an array + * of hashes. Merge results. + */ + +void run_pool_hash( + struct worker_pool *pool, + struct hmap *result, + struct hmap *result_frags) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + fast_hmap_merge(result, &result_frags[index]); + hmap_destroy(&result_frags[index]); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +void merge_lists(struct ovs_list **dest, struct ovs_list *inc) +{ + struct ovs_list *last, *first; + if (inc == NULL) { + return; + } + + if (* dest == NULL) { + * dest = inc; + return; + } + + if (ovs_list_is_empty(inc)) { + return; + } + + if (ovs_list_is_empty(*dest)) { + * dest = inc; + return; + } + + + last = inc->prev; + /* first element is not the list pointer itself, it is the ->next */ + first = inc->next; + + (*dest)->prev->next = first; + first->prev = (*dest)->prev; + + (*dest)->prev = last; + last->next = *dest; +} + +/* Run a thread pool which gathers results in an array + * of lists. Merge results + */ + +void run_pool_list( + struct worker_pool *pool, + struct ovs_list **result, + struct ovs_list **result_frags) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + merge_lists(result, result_frags[index]); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} + +/* Run a thread pool which uses a callback function to process results + */ + +void run_pool_callback( + struct worker_pool *pool, + void *fin_result, + void (*helper_func)( + struct worker_pool *pool, void *fin_result, int index)) +{ + int index, completed; + + atomic_thread_fence(memory_order_release); + + for (index = 0; index < pool->size; index++) { + sem_post(&pool->controls[index].fire); + } + + completed = 0; + + do { + bool test; + sem_wait(&pool->done); + for (index = 0; index < pool->size; index++) { + test = true; + if (atomic_compare_exchange_weak( + &pool->controls[index].finished, + &test, + false)) { + (helper_func)(pool, fin_result, index); + completed++; + pool->controls[index].data = NULL; + } + } + } while (completed < pool->size); +} diff --git a/lib/fasthmap.h b/lib/fasthmap.h new file mode 100644 index 000000000..947900801 --- /dev/null +++ b/lib/fasthmap.h @@ -0,0 +1,146 @@ +/* + * Copyright (c) 2020 Red Hat, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at: + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FAST_HMAP_H +#define FAST_HMAP_H 1 + + +#ifdef __cplusplus +extern "C" { +#endif + +#include <stdbool.h> +#include <stdlib.h> +#include <semaphore.h> +#include "openvswitch/util.h" +#include "openvswitch/hmap.h" +#include "openvswitch/thread.h" +#include "ovs-atomic.h" + +#define HMAP_FOR_EACH_IN_PARALLEL(NODE, MEMBER, JOBID, HMAP) \ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER); \ + (NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false); \ + ASSIGN_CONTAINER(NODE, hmap_next_in_bucket(&(NODE)->MEMBER), MEMBER)) + +/* Safe when NODE may be freed (not needed when NODE may be removed from the + * hash map but its members remain accessible and intact). */ +#define HMAP_FOR_EACH_IN_PARALLEL_SAFE(NODE, NEXT, MEMBER, JOBID, HMAP) \ + HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, MEMBER, JOBID, HMAP, (void) 0) + +#define HMAP_FOR_EACH_SAFE_PARALLEL_INIT(NODE, NEXT, \ + MEMBER, JOBID, HMAP, ...)\ + for (INIT_CONTAINER(NODE, hmap_first_in_bucket_num(HMAP, JOBID), MEMBER), \ + __VA_ARGS__; \ + ((NODE != OBJECT_CONTAINING(NULL, NODE, MEMBER)) \ + || ((NODE = NULL), false) \ + ? INIT_CONTAINER(NEXT, hmap_next_in_bucket(&(NODE)->MEMBER), \ + MEMBER), 1 : 0); \ + (NODE) = (NEXT)) + +struct worker_control { + int id; + int size; + atomic_bool finished; + sem_t fire; + sem_t *done; + struct ovs_mutex mutex; + void *data; + void *workload; +}; + +struct worker_pool { + int size; + struct ovs_list list_node; + struct worker_control *controls; + sem_t done; +}; + +struct worker_pool *add_worker_pool(void *(*start)(void *)); + +bool seize_fire(void); +void fast_hmap_size_for(struct hmap *hmap, int size); +void fast_hmap_init(struct hmap *hmap, ssize_t size); +void fast_hmap_merge(struct hmap *dest, struct hmap *inc); +void hmap_merge(struct hmap *dest, struct hmap *inc); +void merge_lists(struct ovs_list **dest, struct ovs_list *inc); + +void run_pool( + struct worker_pool *pool); +void run_pool_hash( + struct worker_pool *pool, struct hmap *result, struct hmap *result_frags); +void run_pool_list( + struct worker_pool *pool, struct ovs_list **result, + struct ovs_list **result_frags); +void run_pool_callback( + struct worker_pool *pool, + void *fin_result, + void (*helper_func)( + struct worker_pool *pool, void *fin_result, int index)); + + +/* Returns the first node in 'hmap' in the bucket in which the given 'hash' + * would land, or a null pointer if that bucket is empty. */ +static inline struct hmap_node * +hmap_first_in_bucket_num(const struct hmap *hmap, size_t num) +{ + return hmap->buckets[num]; +} + +static inline struct hmap_node * +parallel_hmap_next__(const struct hmap *hmap, size_t start, size_t pool_size) +{ + size_t i; + for (i = start; i <= hmap->mask; i+= pool_size) { + struct hmap_node *node = hmap->buckets[i]; + if (node) { + return node; + } + } + return NULL; +} + +/* Returns the first node in 'hmap', as expected by thread with job_id + * for parallel processing in arbitrary order, or a null pointer if + * the slice of 'hmap' for that job_id is empty. */ +static inline struct hmap_node * +parallel_hmap_first(const struct hmap *hmap, size_t job_id, size_t pool_size) +{ + return parallel_hmap_next__(hmap, job_id, pool_size); +} + +/* Returns the next node in the slice of 'hmap' following 'node', + * in arbitrary order, or a * null pointer if 'node' is the last node in + * the 'hmap' slice. + * + */ +static inline struct hmap_node * +parallel_hmap_next( + const struct hmap *hmap, + const struct hmap_node *node, + ssize_t pool_size) +{ + return (node->next + ? node->next + : parallel_hmap_next__(hmap, + (node->hash & hmap->mask) + pool_size, pool_size)); +} + +#ifdef __cplusplus +} +#endif + +#endif /* lib/fast-hmap.h */ -- 2.20.1 _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
