This commit initializes the keepalive subsystem and spawns keepalive
thread that wakes up at regular intervals to update the timestamp and
status of pmd cores in shared memory.

This patch implements POSIX shared memory object for storing the events
that will be read by monitoring framework. Also implements APIs to read
the keepalive settings from OVSDB.

Signed-off-by: Bhanuprakash Bodireddy <bhanuprakash.bodire...@intel.com>
---
 lib/netdev-dpdk.c | 231 +++++++++++++++++++++++++++++++++++++++++++++++++++++-
 lib/netdev-dpdk.h |   7 +-
 2 files changed, 236 insertions(+), 2 deletions(-)

diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
index ddc651b..c308ef5 100644
--- a/lib/netdev-dpdk.c
+++ b/lib/netdev-dpdk.c
@@ -22,6 +22,8 @@
 #include <stdlib.h>
 #include <errno.h>
 #include <unistd.h>
+#include <sys/mman.h>
+#include <fcntl.h>
 
 #include <rte_config.h>
 #include <rte_cycles.h>
@@ -32,6 +34,7 @@
 #include <rte_mbuf.h>
 #include <rte_meter.h>
 #include <rte_virtio_net.h>
+#include <rte_keepalive.h>
 
 #include "dirs.h"
 #include "dp-packet.h"
@@ -48,8 +51,9 @@
 #include "ovs-numa.h"
 #include "ovs-thread.h"
 #include "ovs-rcu.h"
-#include "packets.h"
 #include "openvswitch/shash.h"
+#include "packets.h"
+#include "process.h"
 #include "smap.h"
 #include "sset.h"
 #include "unaligned.h"
@@ -402,6 +406,29 @@ struct netdev_rxq_dpdk {
     int port_id;
 };
 
+/*
+ * OVS Shared Memory structure
+ *
+ * The information in the shared memory block will be read by collectd.
+ * */
+struct dpdk_keepalive_shm {
+    /* IPC semaphore. Posted when a core dies */
+    sem_t core_died;
+
+    /*
+     * Relayed status of each core.
+     * UNUSED[0], ALIVE[1], DEAD[2], GONE[3], MISSING[4], DOZING[5], SLEEP[6]
+     **/
+    enum rte_keepalive_state core_state[RTE_KEEPALIVE_MAXCORES];
+
+    /* Last seen timestamp of the core */
+    uint64_t core_last_seen_times[RTE_KEEPALIVE_MAXCORES];
+
+    /* Store pmd thread tid */
+    pid_t thread_id[RTE_KEEPALIVE_MAXCORES];
+};
+
+static struct dpdk_keepalive_shm *ka_shm;
 static int netdev_dpdk_class_init(void);
 static int netdev_dpdk_vhost_class_init(void);
 
@@ -586,6 +613,202 @@ netdev_dpdk_mempool_configure(struct netdev_dpdk *dev)
     return 0;
 }
 
+void
+dpdk_ka_get_tid(unsigned core_idx)
+{
+    uint32_t tid = rte_sys_gettid();
+
+    if (dpdk_is_ka_enabled() && ka_shm) {
+            ka_shm->thread_id[core_idx] = tid;
+    }
+}
+
+/* Callback function invoked on heartbeat miss.  Verify if it is genuine
+ * heartbeat miss or a false positive and log the message accordingly.
+ */
+static void
+dpdk_failcore_cb(void *ptr_data, const int core_id)
+{
+    struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm *)ptr_data;
+
+    if (ka_shm) {
+        int pstate;
+        uint32_t tid = ka_shm->thread_id[core_id];
+        int err = get_process_status(tid, &pstate);
+
+        if (!err) {
+            switch (pstate) {
+
+            case ACTIVE_STATE:
+                VLOG_INFO_RL(&rl,"False positive, pmd tid[%"PRIu32"] alive\n",
+                                  tid);
+                break;
+            case STOPPED_STATE:
+            case TRACED_STATE:
+            case DEFUNC_STATE:
+            case UNINTERRUPTIBLE_SLEEP_STATE:
+                VLOG_WARN_RL(&rl,
+                    "PMD tid[%"PRIu32"] on core[%d] is unresponsive\n",
+                    tid, core_id);
+                break;
+            default:
+                VLOG_DBG("%s: The process state: %d\n", __FUNCTION__, pstate);
+                OVS_NOT_REACHED();
+            }
+        }
+    }
+}
+
+/* Notify the external monitoring application for change in core state.
+ *
+ * On a consecutive heartbeat miss the core is considered dead and the status
+ * is relayed to monitoring framework by unlocking the semaphore.
+ */
+static void
+dpdk_ka_relay_core_state(void *ptr_data, const int core_id,
+       const enum rte_keepalive_state core_state, uint64_t last_alive)
+{
+    struct dpdk_keepalive_shm *ka_shm = (struct dpdk_keepalive_shm *)ptr_data;
+    int count;
+
+    if (!ka_shm) {
+        VLOG_ERR("KeepAlive: Invalid shared memory block\n");
+        return;
+    }
+
+    VLOG_DBG_RL(&rl,
+               "TS(%lu):CORE%d, old state:%d, current_state:%d\n",
+               (unsigned long)time(NULL),core_id,ka_shm->core_state[core_id],
+               core_state);
+
+    switch (core_state) {
+    case RTE_KA_STATE_ALIVE:
+    case RTE_KA_STATE_MISSING:
+        ka_shm->core_state[core_id] = RTE_KA_STATE_ALIVE;
+        ka_shm->core_last_seen_times[core_id] = last_alive;
+        break;
+    case RTE_KA_STATE_DOZING:
+    case RTE_KA_STATE_SLEEP:
+    case RTE_KA_STATE_DEAD:
+    case RTE_KA_STATE_GONE:
+        ka_shm->core_state[core_id] = core_state;
+        ka_shm->core_last_seen_times[core_id] = last_alive;
+        break;
+    case RTE_KA_STATE_UNUSED:
+        ka_shm->core_state[core_id] = RTE_KA_STATE_UNUSED;
+        break;
+    }
+
+    if (OVS_UNLIKELY(core_state == RTE_KA_STATE_DEAD)) {
+        /* To handle inactive collectd, increment the semaphore
+         * if count is '0'. */
+        if (sem_getvalue(&ka_shm->core_died, &count) == -1) {
+            VLOG_WARN("Semaphore check failed\n");
+            return;
+        }
+
+        if (count > 1) {
+            return;
+        }
+
+        VLOG_DBG("Posting semaphore\n");
+        if (sem_post(&ka_shm->core_died) != 0) {
+            VLOG_ERR("Failed to increment semaphore\n");
+        }
+    }
+}
+
+/* Create POSIX Shared memory object and initialize the semaphore. */
+static
+struct dpdk_keepalive_shm *dpdk_keepalive_shm_create(void)
+{
+    int fd;
+    int coreid;
+    struct dpdk_keepalive_shm *ka_shm;
+    char ka_shmblk[40];
+
+    sprintf(ka_shmblk, "%s", dpdk_get_ka_shm());
+    if (shm_unlink(ka_shmblk) == -1 && errno != ENOENT) {
+        VLOG_ERR("Error unlinking stale %s \n", ka_shmblk);
+    }
+
+    if ((fd = shm_open(ka_shmblk,
+           O_CREAT | O_TRUNC | O_RDWR, 0666)) < 0) {
+        VLOG_WARN("Failed to open %s as SHM \n", ka_shmblk);
+    } else if (ftruncate(fd, sizeof(struct dpdk_keepalive_shm)) != 0) {
+        VLOG_WARN("Failed to resize SHM \n");
+    } else {
+        ka_shm = (struct dpdk_keepalive_shm *) mmap(
+           0, sizeof(struct dpdk_keepalive_shm),
+            PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
+        close(fd);
+        if (ka_shm == MAP_FAILED) {
+            VLOG_WARN("Failed to mmap SHM \n");
+        } else {
+            memset(ka_shm, 0, sizeof(struct dpdk_keepalive_shm));
+
+            /* Initialize the semaphores for IPC/SHM use */
+            if (sem_init(&ka_shm->core_died, 1, 0) != 0) {
+                VLOG_WARN("Failed to setup SHM semaphore \n");
+                return NULL;
+            }
+
+            /* Mark all cores to 'not present' */
+            for (coreid = 0; coreid < RTE_KEEPALIVE_MAXCORES; coreid++) {
+                ka_shm->core_state[coreid] = RTE_KA_STATE_UNUSED;
+                ka_shm->core_last_seen_times[coreid] = 0;
+            }
+
+            return ka_shm;
+        }
+    }
+    return NULL;
+}
+
+/* Initialize Keepalive sub-system and register callback. */
+int
+keepalive_init(void)
+{
+    if (!dpdk_is_ka_enabled()) {
+       VLOG_INFO("KeepAlive Disabled\n");
+       return -1;
+    }
+
+    /* Create shared memory block */
+    ka_shm = dpdk_keepalive_shm_create();
+    if (ka_shm == NULL) {
+        VLOG_ERR("dpdk_keepalive_shm_create() failed\n");
+        return -1;
+    }
+
+    /* Initialize keepalive subsystem */
+    if ((rte_global_keepalive_info =
+            rte_keepalive_create(&dpdk_failcore_cb, ka_shm)) == NULL) {
+        VLOG_ERR("Keepalive initialization failed\n");
+        return -1;
+    } else {
+        rte_keepalive_register_relay_callback(rte_global_keepalive_info,
+            dpdk_ka_relay_core_state, ka_shm);
+    }
+
+    return 0;
+}
+
+/* Keepalive thread. */
+static void *
+ovs_keepalive(void *dummy OVS_UNUSED)
+{
+    uint32_t keepalive_timer_interval_ms = dpdk_get_ka_interval();
+    pthread_detach(pthread_self());
+
+    for (;;) {
+        rte_keepalive_dispatch_pings(NULL, rte_global_keepalive_info);
+        xusleep(keepalive_timer_interval_ms * 1000);
+    }
+
+    return NULL;
+}
+
 static void
 check_link_status(struct netdev_dpdk *dev)
 {
@@ -2747,6 +2970,12 @@ netdev_dpdk_class_init(void)
      * needs to be done only once */
     if (ovsthread_once_start(&once)) {
         ovs_thread_create("dpdk_watchdog", dpdk_watchdog, NULL);
+
+        int err = keepalive_init();
+        if (!err) {
+            ovs_thread_create("ovs_keepalive", ovs_keepalive, NULL);
+        }
+
         unixctl_command_register("netdev-dpdk/set-admin-state",
                                  "[netdev] up|down", 1, 2,
                                  netdev_dpdk_set_admin_state, NULL);
diff --git a/lib/netdev-dpdk.h b/lib/netdev-dpdk.h
index b7d02a7..ccb13fd 100644
--- a/lib/netdev-dpdk.h
+++ b/lib/netdev-dpdk.h
@@ -18,7 +18,7 @@
 #define NETDEV_DPDK_H
 
 #include <config.h>
-
+#include <semaphore.h>
 #include "openvswitch/compiler.h"
 
 struct dp_packet;
@@ -28,6 +28,11 @@ struct dp_packet;
 void netdev_dpdk_register(void);
 void free_dpdk_buf(struct dp_packet *);
 
+int keepalive_init(void);
+void keepalive_create_thread(void);
+void dpdk_ka_get_tid(unsigned);
+
+struct rte_keepalive *rte_global_keepalive_info;
 #else
 
 static inline void
-- 
2.4.11

_______________________________________________
dev mailing list
d...@openvswitch.org
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to