Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com> writes: > This commit initializes the keepalive subsystem and spawns keepalive > thread that wakes up at regular intervals to update the timestamp and > status of pmd cores in shared memory. > > This patch implements POSIX shared memory object for storing the events > that will be read by monitoring framework. Also implements APIs to read > the keepalive settings from OVSDB. > > Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com> > --- > lib/netdev-dpdk.c | 231 > +++++++++++++++++++++++++++++++++++++++++++++++++++++- > lib/netdev-dpdk.h | 7 +- > 2 files changed, 236 insertions(+), 2 deletions(-) > > diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c > index ddc651b..c308ef5 100644 > --- a/lib/netdev-dpdk.c > +++ b/lib/netdev-dpdk.c > @@ -22,6 +22,8 @@ > #include <stdlib.h> > #include <errno.h> > #include <unistd.h> > +#include <sys/mman.h> > +#include <fcntl.h> > > #include <rte_config.h> > #include <rte_cycles.h> > @@ -32,6 +34,7 @@ > #include <rte_mbuf.h> > #include <rte_meter.h> > #include <rte_virtio_net.h> > +#include <rte_keepalive.h> > > #include "dirs.h" > #include "dp-packet.h" > @@ -48,8 +51,9 @@ > #include "ovs-numa.h" > #include "ovs-thread.h" > #include "ovs-rcu.h" > -#include "packets.h" > #include "openvswitch/shash.h" > +#include "packets.h" > +#include "process.h" > #include "smap.h" > #include "sset.h" > #include "unaligned.h" > @@ -402,6 +406,29 @@ struct netdev_rxq_dpdk { > int port_id; > }; > > +/* > + * OVS Shared Memory structure > + * > + * The information in the shared memory block will be read by collectd. > + * */ > +struct dpdk_keepalive_shm { > + /* IPC semaphore. Posted when a core dies */ > + sem_t core_died; > + > + /* > + * Relayed status of each core. > + * UNUSED[0], ALIVE[1], DEAD[2], GONE[3], MISSING[4], DOZING[5], SLEEP[6] > + **/ > + enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
What is 'DOZING'? What is 'MISSING'? Where is a definition of these states and what they mean? What is DEAD&GONE? > + /* Last seen timestamp of the core */ > + uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES]; > + > + /* Store pmd thread tid */ > + pid_t thread_id[RTE_KEEPALIVE_MAXCORES]; > +}; > + > +static struct dpdk_keepalive_shm *ka_shm; > static int netdev_dpdk_class_init(void); > static int netdev_dpdk_vhost_class_init(void); > > @@ -586,6 +613,202 @@ netdev_dpdk_mempool_configure(struct netdev_dpdk *dev) > return 0; > } > > +void > +dpdk_ka_get_tid(unsigned core_idx) > +{ > + uint32_t tid = rte_sys_gettid(); > + > + if (dpdk_is_ka_enabled() && ka_shm) { > + ka_shm->thread_id[core_idx] = tid; > + } > +} > + > +/* Callback function invoked on heartbeat miss. Verify if it is genuine > + * heartbeat miss or a false positive and log the message accordingly. > + */ > +static void > +dpdk_failcore_cb(void *ptr_data, const int core_id) > +{ > + struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm > *)ptr_data; > + > + if (ka_shm) { > + int pstate; > + uint32_t tid = ka_shm->thread_id[core_id]; > + int err = get_process_status(tid, &pstate); > + > + if (!err) { > + switch (pstate) { > + > + case ACTIVE_STATE: > + VLOG_INFO_RL(&rl,"False positive, pmd tid[%"PRIu32"] > alive\n", > + tid); Can we get false positives? Doesn't that diminish the usefulness? > + break; > + case STOPPED_STATE: > + case TRACED_STATE: > + case DEFUNC_STATE: > + case UNINTERRUPTIBLE_SLEEP_STATE: > + VLOG_WARN_RL(&rl, > + "PMD tid[%"PRIu32"] on core[%d] is unresponsive\n", > + tid, core_id); > + break; > + default: > + VLOG_DBG("%s: The process state: %d\n", __FUNCTION__, > pstate); > + OVS_NOT_REACHED(); > + } > + } > + } > +} > + > +/* Notify the external monitoring application for change in core state. > + * > + * On a consecutive heartbeat miss the core is considered dead and the status > + * is relayed to monitoring framework by unlocking the semaphore. > + */ > +static void > +dpdk_ka_relay_core_state(void *ptr_data, const int core_id, > + const enum rte_keepalive_state core_state, uint64_t last_alive) > +{ > + struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm > *)ptr_data; > + int count; > + > + if (!ka_shm) { > + VLOG_ERR("KeepAlive: Invalid shared memory block\n"); > + return; > + } > + > + VLOG_DBG_RL(&rl, > + "TS(%lu):CORE%d, old state:%d, current_state:%d\n", > + (unsigned long)time(NULL),core_id,ka_shm->core_state[core_id], > + core_state); > + > + switch (core_state) { > + case RTE_KA_STATE_ALIVE: > + case RTE_KA_STATE_MISSING: > + ka_shm->core_state[core_id] = RTE_KA_STATE_ALIVE; > + ka_shm->core_last_seen_times[core_id] = last_alive; > + break; > + case RTE_KA_STATE_DOZING: > + case RTE_KA_STATE_SLEEP: > + case RTE_KA_STATE_DEAD: > + case RTE_KA_STATE_GONE: > + ka_shm->core_state[core_id] = core_state; > + ka_shm->core_last_seen_times[core_id] = last_alive; > + break; > + case RTE_KA_STATE_UNUSED: > + ka_shm->core_state[core_id] = RTE_KA_STATE_UNUSED; > + break; > + } > + > + if (OVS_UNLIKELY(core_state == RTE_KA_STATE_DEAD)) { > + /* To handle inactive collectd, increment the semaphore > + * if count is '0'. */ This whole mechanism seems very error prone. Is it possible to hang a thread with the subsequent sem_post? > + if (sem_getvalue(&ka_shm->core_died, &count) == -1) { > + VLOG_WARN("Semaphore check failed\n"); > + return; > + } > + > + if (count > 1) { > + return; > + } > + > + VLOG_DBG("Posting semaphore\n"); > + if (sem_post(&ka_shm->core_died) != 0) { > + VLOG_ERR("Failed to increment semaphore\n"); > + } > + } > +} > + > +/* Create POSIX Shared memory object and initialize the semaphore. */ > +static > +struct dpdk_keepalive_shm *dpdk_keepalive_shm_create(void) > +{ > + int fd; > + int coreid; > + struct dpdk_keepalive_shm *ka_shm; > + char ka_shmblk[40]; > + > + sprintf(ka_shmblk, "%s", dpdk_get_ka_shm()); > + if (shm_unlink(ka_shmblk) == -1 && errno != ENOENT) { > + VLOG_ERR("Error unlinking stale %s \n", ka_shmblk); > + } > + > + if ((fd = shm_open(ka_shmblk, > + O_CREAT | O_TRUNC | O_RDWR, 0666)) < 0) { > + VLOG_WARN("Failed to open %s as SHM \n", ka_shmblk); > + } else if (ftruncate(fd, sizeof(struct dpdk_keepalive_shm)) != 0) { > + VLOG_WARN("Failed to resize SHM \n"); > + } else { > + ka_shm = (struct dpdk_keepalive_shm *) mmap( > + 0, sizeof(struct dpdk_keepalive_shm), > + PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0); > + close(fd); > + if (ka_shm == MAP_FAILED) { > + VLOG_WARN("Failed to mmap SHM \n"); > + } else { > + memset(ka_shm, 0, sizeof(struct dpdk_keepalive_shm)); > + > + /* Initialize the semaphores for IPC/SHM use */ > + if (sem_init(&ka_shm->core_died, 1, 0) != 0) { > + VLOG_WARN("Failed to setup SHM semaphore \n"); > + return NULL; > + } > + > + /* Mark all cores to 'not present' */ > + for (coreid = 0; coreid < RTE_KEEPALIVE_MAXCORES; coreid++) { > + ka_shm->core_state[coreid] = RTE_KA_STATE_UNUSED; > + ka_shm->core_last_seen_times[coreid] = 0; > + } > + > + return ka_shm; > + } > + } > + return NULL; > +} > + > +/* Initialize Keepalive sub-system and register callback. */ > +int > +keepalive_init(void) > +{ > + if (!dpdk_is_ka_enabled()) { > + VLOG_INFO("KeepAlive Disabled\n"); > + return -1; > + } > + > + /* Create shared memory block */ > + ka_shm = dpdk_keepalive_shm_create(); > + if (ka_shm == NULL) { > + VLOG_ERR("dpdk_keepalive_shm_create() failed\n"); > + return -1; > + } > + > + /* Initialize keepalive subsystem */ > + if ((rte_global_keepalive_info = > + rte_keepalive_create(&dpdk_failcore_cb, ka_shm)) == NULL) { > + VLOG_ERR("Keepalive initialization failed\n"); > + return -1; > + } else { > + rte_keepalive_register_relay_callback(rte_global_keepalive_info, > + dpdk_ka_relay_core_state, ka_shm); > + } > + > + return 0; > +} > + > +/* Keepalive thread. */ > +static void * > +ovs_keepalive(void *dummy OVS_UNUSED) > +{ > + uint32_t keepalive_timer_interval_ms = dpdk_get_ka_interval(); > + pthread_detach(pthread_self()); > + > + for (;;) { > + rte_keepalive_dispatch_pings(NULL, rte_global_keepalive_info); > + xusleep(keepalive_timer_interval_ms * 1000); > + } > + > + return NULL; > +} > + > static void > check_link_status(struct netdev_dpdk *dev) > { > @@ -2747,6 +2970,12 @@ netdev_dpdk_class_init(void) > * needs to be done only once */ > if (ovsthread_once_start(&once)) { > ovs_thread_create("dpdk_watchdog", dpdk_watchdog, NULL); > + > + int err = keepalive_init(); > + if (!err) { > + ovs_thread_create("ovs_keepalive", ovs_keepalive, NULL); > + } > + > unixctl_command_register("netdev-dpdk/set-admin-state", > "[netdev] up|down", 1, 2, > netdev_dpdk_set_admin_state, NULL); > diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h > index b7d02a7..ccb13fd 100644 > --- a/lib/netdev-dpdk.h > +++ b/lib/netdev-dpdk.h > @@ -18,7 +18,7 @@ > #define NETDEV_DPDK_H > > #include <config.h> > - > +#include <semaphore.h> > #include "openvswitch/compiler.h" > > struct dp_packet; > @@ -28,6 +28,11 @@ struct dp_packet; > void netdev_dpdk_register(void); > void free_dpdk_buf(struct dp_packet *); > > +int keepalive_init(void); > +void keepalive_create_thread(void); > +void dpdk_ka_get_tid(unsigned); > + > +struct rte_keepalive *rte_global_keepalive_info; > #else > > static inline void _______________________________________________ dev mailing list d...@openvswitch.org https://mail.openvswitch.org/mailman/listinfo/ovs-dev