Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com> writes:

> This commit initializes the keepalive subsystem and spawns keepalive
> thread that wakes up at regular intervals to update the timestamp and
> status of pmd cores in shared memory.
>
> This patch implements POSIX shared memory object for storing the events
> that will be read by monitoring framework. Also implements APIs to read
> the keepalive settings from OVSDB.
>
> Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com>
> ---
>  lib/netdev-dpdk.c | 231 
> +++++++++++++++++++++++++++++++++++++++++++++++++++++-
>  lib/netdev-dpdk.h |   7 +-
>  2 files changed, 236 insertions(+), 2 deletions(-)
>
> diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
> index ddc651b..c308ef5 100644
> --- a/lib/netdev-dpdk.c
> +++ b/lib/netdev-dpdk.c
> @@ -22,6 +22,8 @@
>  #include <stdlib.h>
>  #include <errno.h>
>  #include <unistd.h>
> +#include <sys/mman.h>
> +#include <fcntl.h>
>  
>  #include <rte_config.h>
>  #include <rte_cycles.h>
> @@ -32,6 +34,7 @@
>  #include <rte_mbuf.h>
>  #include <rte_meter.h>
>  #include <rte_virtio_net.h>
> +#include <rte_keepalive.h>
>  
>  #include "dirs.h"
>  #include "dp-packet.h"
> @@ -48,8 +51,9 @@
>  #include "ovs-numa.h"
>  #include "ovs-thread.h"
>  #include "ovs-rcu.h"
> -#include "packets.h"
>  #include "openvswitch/shash.h"
> +#include "packets.h"
> +#include "process.h"
>  #include "smap.h"
>  #include "sset.h"
>  #include "unaligned.h"
> @@ -402,6 +406,29 @@ struct netdev_rxq_dpdk {
>      int port_id;
>  };
>  
> +/*
> + * OVS Shared Memory structure
> + *
> + * The information in the shared memory block will be read by collectd.
> + * */
> +struct dpdk_keepalive_shm {
> +    /* IPC semaphore. Posted when a core dies */
> +    sem_t core_died;
> +
> +    /*
> +     * Relayed status of each core.
> +     * UNUSED[0], ALIVE[1], DEAD[2], GONE[3], MISSING[4], DOZING[5], SLEEP[6]
> +     **/
> +    enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];

What is 'DOZING'?  What is 'MISSING'?  Where is a definition of these
states and what they mean?  What is DEAD&GONE?

> +    /* Last seen timestamp of the core */
> +    uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
> +
> +    /* Store pmd thread tid */
> +    pid_t thread_id[RTE_KEEPALIVE_MAXCORES];
> +};
> +
> +static struct dpdk_keepalive_shm *ka_shm;
>  static int netdev_dpdk_class_init(void);
>  static int netdev_dpdk_vhost_class_init(void);
>  
> @@ -586,6 +613,202 @@ netdev_dpdk_mempool_configure(struct netdev_dpdk *dev)
>      return 0;
>  }
>  
> +void
> +dpdk_ka_get_tid(unsigned core_idx)
> +{
> +    uint32_t tid = rte_sys_gettid();
> +
> +    if (dpdk_is_ka_enabled() && ka_shm) {
> +            ka_shm->thread_id[core_idx] = tid;
> +    }
> +}
> +
> +/* Callback function invoked on heartbeat miss.  Verify if it is genuine
> + * heartbeat miss or a false positive and log the message accordingly.
> + */
> +static void
> +dpdk_failcore_cb(void *ptr_data, const int core_id)
> +{
> +    struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm 
> *)ptr_data;
> +
> +    if (ka_shm) {
> +        int pstate;
> +        uint32_t tid = ka_shm->thread_id[core_id];
> +        int err = get_process_status(tid, &pstate);
> +
> +        if (!err) {
> +            switch (pstate) {
> +
> +            case ACTIVE_STATE:
> +                VLOG_INFO_RL(&rl,"False positive, pmd tid[%"PRIu32"] 
> alive\n",
> +                                  tid);

Can we get false positives?  Doesn't that diminish the usefulness?

> +                break;
> +            case STOPPED_STATE:
> +            case TRACED_STATE:
> +            case DEFUNC_STATE:
> +            case UNINTERRUPTIBLE_SLEEP_STATE:
> +                VLOG_WARN_RL(&rl,
> +                    "PMD tid[%"PRIu32"] on core[%d] is unresponsive\n",
> +                    tid, core_id);
> +                break;
> +            default:
> +                VLOG_DBG("%s: The process state: %d\n", __FUNCTION__, 
> pstate);
> +                OVS_NOT_REACHED();
> +            }
> +        }
> +    }
> +}
> +
> +/* Notify the external monitoring application for change in core state.
> + *
> + * On a consecutive heartbeat miss the core is considered dead and the status
> + * is relayed to monitoring framework by unlocking the semaphore.
> + */
> +static void
> +dpdk_ka_relay_core_state(void *ptr_data, const int core_id,
> +       const enum rte_keepalive_state core_state, uint64_t last_alive)
> +{
> +    struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm 
> *)ptr_data;
> +    int count;
> +
> +    if (!ka_shm) {
> +        VLOG_ERR("KeepAlive: Invalid shared memory block\n");
> +        return;
> +    }
> +
> +    VLOG_DBG_RL(&rl,
> +               "TS(%lu):CORE%d, old state:%d, current_state:%d\n",
> +               (unsigned long)time(NULL),core_id,ka_shm->core_state[core_id],
> +               core_state);
> +
> +    switch (core_state) {
> +    case RTE_KA_STATE_ALIVE:
> +    case RTE_KA_STATE_MISSING:
> +        ka_shm->core_state[core_id] = RTE_KA_STATE_ALIVE;
> +        ka_shm->core_last_seen_times[core_id] = last_alive;
> +        break;
> +    case RTE_KA_STATE_DOZING:
> +    case RTE_KA_STATE_SLEEP:
> +    case RTE_KA_STATE_DEAD:
> +    case RTE_KA_STATE_GONE:
> +        ka_shm->core_state[core_id] = core_state;
> +        ka_shm->core_last_seen_times[core_id] = last_alive;
> +        break;
> +    case RTE_KA_STATE_UNUSED:
> +        ka_shm->core_state[core_id] = RTE_KA_STATE_UNUSED;
> +        break;
> +    }
> +
> +    if (OVS_UNLIKELY(core_state == RTE_KA_STATE_DEAD)) {
> +        /* To handle inactive collectd, increment the semaphore
> +         * if count is '0'. */

This whole mechanism seems very error prone.  Is it possible to hang a
thread with the subsequent sem_post?

> +        if (sem_getvalue(&ka_shm->core_died, &count) == -1) {
> +            VLOG_WARN("Semaphore check failed\n");
> +            return;
> +        }
> +
> +        if (count > 1) {
> +            return;
> +        }
> +
> +        VLOG_DBG("Posting semaphore\n");
> +        if (sem_post(&ka_shm->core_died) != 0) {
> +            VLOG_ERR("Failed to increment semaphore\n");
> +        }
> +    }
> +}
> +
> +/* Create POSIX Shared memory object and initialize the semaphore. */
> +static
> +struct dpdk_keepalive_shm *dpdk_keepalive_shm_create(void)
> +{
> +    int fd;
> +    int coreid;
> +    struct dpdk_keepalive_shm *ka_shm;
> +    char ka_shmblk[40];
> +
> +    sprintf(ka_shmblk, "%s", dpdk_get_ka_shm());
> +    if (shm_unlink(ka_shmblk) == -1 && errno != ENOENT) {
> +        VLOG_ERR("Error unlinking stale %s \n", ka_shmblk);
> +    }
> +
> +    if ((fd = shm_open(ka_shmblk,
> +           O_CREAT | O_TRUNC | O_RDWR, 0666)) < 0) {
> +        VLOG_WARN("Failed to open %s as SHM \n", ka_shmblk);
> +    } else if (ftruncate(fd, sizeof(struct dpdk_keepalive_shm)) != 0) {
> +        VLOG_WARN("Failed to resize SHM \n");
> +    } else {
> +        ka_shm = (struct dpdk_keepalive_shm *) mmap(
> +           0, sizeof(struct dpdk_keepalive_shm),
> +            PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
> +        close(fd);
> +        if (ka_shm == MAP_FAILED) {
> +            VLOG_WARN("Failed to mmap SHM \n");
> +        } else {
> +            memset(ka_shm, 0, sizeof(struct dpdk_keepalive_shm));
> +
> +            /* Initialize the semaphores for IPC/SHM use */
> +            if (sem_init(&ka_shm->core_died, 1, 0) != 0) {
> +                VLOG_WARN("Failed to setup SHM semaphore \n");
> +                return NULL;
> +            }
> +
> +            /* Mark all cores to 'not present' */
> +            for (coreid = 0; coreid < RTE_KEEPALIVE_MAXCORES; coreid++) {
> +                ka_shm->core_state[coreid] = RTE_KA_STATE_UNUSED;
> +                ka_shm->core_last_seen_times[coreid] = 0;
> +            }
> +
> +            return ka_shm;
> +        }
> +    }
> +    return NULL;
> +}
> +
> +/* Initialize Keepalive sub-system and register callback. */
> +int
> +keepalive_init(void)
> +{
> +    if (!dpdk_is_ka_enabled()) {
> +       VLOG_INFO("KeepAlive Disabled\n");
> +       return -1;
> +    }
> +
> +    /* Create shared memory block */
> +    ka_shm = dpdk_keepalive_shm_create();
> +    if (ka_shm == NULL) {
> +        VLOG_ERR("dpdk_keepalive_shm_create() failed\n");
> +        return -1;
> +    }
> +
> +    /* Initialize keepalive subsystem */
> +    if ((rte_global_keepalive_info =
> +            rte_keepalive_create(&dpdk_failcore_cb, ka_shm)) == NULL) {
> +        VLOG_ERR("Keepalive initialization failed\n");
> +        return -1;
> +    } else {
> +        rte_keepalive_register_relay_callback(rte_global_keepalive_info,
> +            dpdk_ka_relay_core_state, ka_shm);
> +    }
> +
> +    return 0;
> +}
> +
> +/* Keepalive thread. */
> +static void *
> +ovs_keepalive(void *dummy OVS_UNUSED)
> +{
> +    uint32_t keepalive_timer_interval_ms = dpdk_get_ka_interval();
> +    pthread_detach(pthread_self());
> +
> +    for (;;) {
> +        rte_keepalive_dispatch_pings(NULL, rte_global_keepalive_info);
> +        xusleep(keepalive_timer_interval_ms * 1000);
> +    }
> +
> +    return NULL;
> +}
> +
>  static void
>  check_link_status(struct netdev_dpdk *dev)
>  {
> @@ -2747,6 +2970,12 @@ netdev_dpdk_class_init(void)
>       * needs to be done only once */
>      if (ovsthread_once_start(&once)) {
>          ovs_thread_create("dpdk_watchdog", dpdk_watchdog, NULL);
> +
> +        int err = keepalive_init();
> +        if (!err) {
> +            ovs_thread_create("ovs_keepalive", ovs_keepalive, NULL);
> +        }
> +
>          unixctl_command_register("netdev-dpdk/set-admin-state",
>                                   "[netdev] up|down", 1, 2,
>                                   netdev_dpdk_set_admin_state, NULL);
> diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
> index b7d02a7..ccb13fd 100644
> --- a/lib/netdev-dpdk.h
> +++ b/lib/netdev-dpdk.h
> @@ -18,7 +18,7 @@
>  #define NETDEV_DPDK_H
>  
>  #include <config.h>
> -
> +#include <semaphore.h>
>  #include "openvswitch/compiler.h"
>  
>  struct dp_packet;
> @@ -28,6 +28,11 @@ struct dp_packet;
>  void netdev_dpdk_register(void);
>  void free_dpdk_buf(struct dp_packet *);
>  
> +int keepalive_init(void);
> +void keepalive_create_thread(void);
> +void dpdk_ka_get_tid(unsigned);
> +
> +struct rte_keepalive *rte_global_keepalive_info;
>  #else
>  
>  static inline void
_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to