Implement odp implementation for linux-generic using standard
odp queue API.

Signed-off-by: Maxim Uvarov <[email protected]>
---
 v4: - fixed Anderses comments. (did not use unlikely() for
        init functions. Only for packet processing.
     - checkpatch cleanup;
     - update to the latest ODP head;
     - remove allocation memory for r_p;

 .gitignore                                         |   1 +
 configure.ac                                       |   1 +
 example/Makefile.am                                |   2 +-
 example/generator/odp_generator.c                  |   6 +-
 example/ipc/Makefile.am                            |   6 +
 example/ipc/README                                 |  56 ++
 example/ipc/odp_ipc.c                              | 677 +++++++++++++++++++++
 example/l2fwd/odp_l2fwd.c                          |   6 +-
 example/odp_example/odp_example.c                  |   3 +-
 example/packet/odp_pktio.c                         |   6 +-
 example/timer/odp_timer_test.c                     |   3 +-
 helper/include/odph_ring.h                         |   2 +
 .../linux-generic/include/api/odp_pktio_types.h    |   1 +
 platform/linux-generic/include/api/odp_queue.h     |   2 +
 .../linux-generic/include/api/odp_shared_memory.h  |  12 +-
 .../linux-generic/include/odp_packet_io_internal.h |   1 +
 .../linux-generic/include/odp_queue_internal.h     |  14 +-
 platform/linux-generic/odp_buffer_pool.c           |   3 +-
 platform/linux-generic/odp_crypto.c                |   3 +-
 platform/linux-generic/odp_packet_io.c             |  31 +-
 platform/linux-generic/odp_queue.c                 | 202 +++++-
 platform/linux-generic/odp_ring.c                  |  11 +-
 platform/linux-generic/odp_schedule.c              |   6 +-
 platform/linux-generic/odp_shared_memory.c         |  70 ++-
 test/api_test/odp_shm_test.c                       |   3 +-
 test/api_test/odp_timer_ping.c                     |   3 +-
 26 files changed, 1098 insertions(+), 33 deletions(-)
 create mode 100644 example/ipc/Makefile.am
 create mode 100644 example/ipc/README
 create mode 100644 example/ipc/odp_ipc.c

diff --git a/.gitignore b/.gitignore
index 6a97f17..78cca53 100644
--- a/.gitignore
+++ b/.gitignore
@@ -32,6 +32,7 @@ lib/
 obj/
 build/
 odp_example
+odp_ipc
 odp_packet
 odp_packet_netmap
 odp_atomic
diff --git a/configure.ac b/configure.ac
index 76cf638..fa5dea4 100644
--- a/configure.ac
+++ b/configure.ac
@@ -144,6 +144,7 @@ AC_CONFIG_FILES([Makefile
                 platform/linux-keystone2/Makefile
                 platform/linux-dpdk/Makefile
                 example/Makefile
+                example/ipc/Makefile
                 example/generator/Makefile
                 example/l2fwd/Makefile
                 example/odp_example/Makefile
diff --git a/example/Makefile.am b/example/Makefile.am
index 01a3305..1a5a138 100644
--- a/example/Makefile.am
+++ b/example/Makefile.am
@@ -1 +1 @@
-SUBDIRS = generator l2fwd odp_example packet packet_netmap timer
+SUBDIRS = generator l2fwd odp_example packet packet_netmap timer ipc
diff --git a/example/generator/odp_generator.c 
b/example/generator/odp_generator.c
index 65c2034..9b010ab 100644
--- a/example/generator/odp_generator.c
+++ b/example/generator/odp_generator.c
@@ -537,7 +537,8 @@ int main(int argc, char *argv[])
        odp_atomic_init_u64(&counters.icmp);
 
        /* Reserve memory for args from shared mem */
-       args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE);
+       args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE,
+                               ODP_SHM_THREAD);
        if (args == NULL) {
                ODP_ERR("Error: shared mem alloc failed.\n");
                exit(EXIT_FAILURE);
@@ -582,7 +583,8 @@ int main(int argc, char *argv[])
 
        /* Create packet pool */
        pool_base = odp_shm_reserve("shm_packet_pool",
-                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
        if (pool_base == NULL) {
                ODP_ERR("Error: packet pool mem alloc failed.\n");
                exit(EXIT_FAILURE);
diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am
new file mode 100644
index 0000000..2fd48f7
--- /dev/null
+++ b/example/ipc/Makefile.am
@@ -0,0 +1,6 @@
+include $(top_srcdir)/example/Makefile.inc
+
+bin_PROGRAMS = odp_ipc
+odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static
+
+dist_odp_ipc_SOURCES = odp_ipc.c
diff --git a/example/ipc/README b/example/ipc/README
new file mode 100644
index 0000000..57df942
--- /dev/null
+++ b/example/ipc/README
@@ -0,0 +1,56 @@
+/* Copyright (c) 2014, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+                       ODP IPC example
+
+This example shows how to use queues to exchange packets between different
+processes.
+
+Examples scheme:
+
+ Ping (Machine 1) ---->  odp_ipc app (Machine 2)
+
+Example burst mode:
+./odp_ipc -i eth0 -m 1 -c 1
+On remote host run ping target that runs odp_ipc.
+
+[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1
+11490 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2
+11490 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3
+11490 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4
+
+
+Main PID/thread [11492/1] enqueues packets to IPC queue with 
odp_queue_enq_multi(),
+child process thread ring_thread() dequeues packets from ipc queue.
+
+
+Example queue mode:
+
+./odp_ipc -i eth0 -m 1 -c 1
+waiting for packet...
+Enqueue the packet to ipc queue size 98/1856
+waiting for packet...
+15917 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+Enqueue the packet to ipc queue size 98/1856
+waiting for packet...
+15917 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+Enqueue the packet to ipc queue size 98/1856
+waiting for packet...
+15917 no valid buffer
+               ring_thread() got buffer from IPC queue size 98/1856
+Enqueue the packet to ipc queue size 98/1856
+waiting for packet...
+
+Thread 15917 moves packets from ingress queue to IPC queue. Other process
+in ring_thread() thread dequeues packets from IPC queue.
diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c
new file mode 100644
index 0000000..0f5d530
--- /dev/null
+++ b/example/ipc/odp_ipc.c
@@ -0,0 +1,677 @@
+/* Copyright (c) 2014, Linaro Limited
+ * All rights reserved.
+ *
+ * SPDX-License-Identifier:     BSD-3-Clause
+ */
+
+/**
+ * @file
+ *
+ * @example odp_ipc.c  ODP IPC queues example application
+ */
+
+#include <stdlib.h>
+#include <string.h>
+#include <getopt.h>
+#include <unistd.h>
+
+#include <odp.h>
+#include <odph_linux.h>
+#include <odph_packet.h>
+#include <odph_eth.h>
+#include <odph_ip.h>
+
+#define MAX_WORKERS            32
+#define SHM_PKT_POOL_SIZE      (512*2048)
+#define SHM_PKT_POOL_BUF_SIZE  1856
+#define MAX_PKT_BURST          16
+
+#define APPL_MODE_PKT_BURST    0
+#define APPL_MODE_PKT_QUEUE    1
+
+#define RING_SIZE 4096
+#define ODP_RING_NAMESIZE 32
+
+#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
+
+/** Get rid of path in filename - only for unix-type paths using '/' */
+#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
+                           strrchr((file_name), '/') + 1 : (file_name))
+/**
+ * Parsed command line application arguments
+ */
+typedef struct {
+       int core_count;
+       int if_count;           /**< Number of interfaces to be used */
+       char **if_names;        /**< Array of pointers to interface names */
+       int mode;               /**< Packet IO mode */
+       int type;               /**< Packet IO type */
+       int fanout;             /**< Packet IO fanout */
+       odp_buffer_pool_t pool; /**< Buffer pool for packet IO */
+} appl_args_t;
+
+/**
+ * Thread specific arguments
+ */
+typedef struct {
+       char *pktio_dev;        /**< Interface name to use */
+       odp_buffer_pool_t pool; /**< Buffer pool for packet IO */
+       int mode;               /**< Thread mode */
+       int type;               /**< Thread i/o type */
+       int fanout;             /**< Thread i/o fanout */
+} thread_args_t;
+
+/**
+ * Grouping of both parsed CL args and thread specific args - alloc together
+ */
+typedef struct {
+       /** Application (parsed) arguments */
+       appl_args_t appl;
+       /** Thread specific arguments */
+       thread_args_t thread[MAX_WORKERS];
+} args_t;
+
+/** Global pointer to args */
+static args_t *args;
+
+/* helper funcs */
+static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len);
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
+static void print_info(char *progname, appl_args_t *appl_args);
+static void usage(char *progname);
+
+static void *ring_thread(void *arg ODP_UNUSED)
+{
+       int ret;
+       odp_buffer_t buf;
+       odp_buffer_pool_t pkt_pool;
+       odp_pktio_params_t pktio_ipc_params;
+       odp_pktio_t pktio_ipc;
+       odp_queue_t ipcq_def;
+
+       printf("ODP RING THREAD PID %d\n", getpid());
+
+       pkt_pool = odp_buffer_pool_lookup("packet_pool");
+       if (pkt_pool == ODP_BUFFER_POOL_INVALID) {
+               ODP_ERR("Error: pkt_pool not found\n");
+               return NULL;
+       }
+
+       /* create shared queue between processes*/
+       pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
+       pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
+       if (pktio_ipc == ODP_PKTIO_INVALID) {
+               ODP_ERR("Error: pktio create failed\n");
+               return NULL;
+       }
+
+       while (1) {
+               ipcq_def = odp_queue_lookup("shared-queue");
+               if (ipcq_def != ODP_QUEUE_INVALID) {
+                       printf("%s() shared-queue found\n", __func__);
+                       break;
+               }
+               sleep(1);
+       }
+
+       ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def);
+       if (ret != 0) {
+               ODP_ERR("Error: slave thread default ipc-Q setup\n");
+               return NULL;
+       }
+
+       /* In loop take packets from ipc queue and free this buffer */
+       while (1) {
+               buf = odp_queue_deq(ipcq_def);
+               if (odp_unlikely(!odp_buffer_is_valid(buf)))
+                       continue;
+
+               printf("\t\t%s() got buffer from IPC queue size %ld/%ld\n",
+                      __func__,
+                      (unsigned long)odp_packet_get_len(buf),
+                      (unsigned long)odp_buffer_size(buf));
+               odp_buffer_free(buf);
+       }
+
+       /* unreachable */
+       return NULL;
+}
+
+
+/**
+ * Packet IO loopback worker thread using ODP queues
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static void *pktio_queue_thread(void *arg)
+{
+       int thr;
+       odp_buffer_pool_t pkt_pool;
+       odp_pktio_t pktio;
+       odp_pktio_t pktio_ipc;
+       thread_args_t *thr_args;
+       odp_queue_t inq_def;
+       odp_queue_t ipcq_def;
+       char inq_name[ODP_QUEUE_NAME_LEN];
+       odp_queue_param_t qparam;
+       odp_buffer_t buf;
+       int ret;
+       odp_pktio_params_t params;
+       odp_pktio_params_t pktio_ipc_params;
+       socket_params_t *sock_params = &params.sock_params;
+
+       thr_args = arg;
+
+       thr = odp_thread_id();
+
+       printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
+              thr_args->pktio_dev);
+
+       /* lookup ring from its name */
+       /* Lookup the packet pool */
+       pkt_pool = odp_buffer_pool_lookup("packet_pool");
+       if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
+               ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
+               return NULL;
+       }
+
+       /* Open a packet IO instance for this thread */
+       sock_params->type = thr_args->type;
+       sock_params->fanout = thr_args->fanout;
+       pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
+       if (pktio == ODP_PKTIO_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio create failed\n", thr);
+               return NULL;
+       }
+
+       /*
+        * Create and set the default INPUT queue associated with the 'pktio'
+        * resource
+        */
+       qparam.sched.prio  = ODP_SCHED_PRIO_DEFAULT;
+       qparam.sched.sync  = ODP_SCHED_SYNC_ATOMIC;
+       qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
+       snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
+       inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
+
+       inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam);
+       if (inq_def == ODP_QUEUE_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio queue creation failed\n", thr);
+               return NULL;
+       }
+
+       ret = odp_pktio_inq_setdef(pktio, inq_def);
+       if (ret != 0) {
+               ODP_ERR("  [%02i] Error: default input-Q setup\n", thr);
+               return NULL;
+       }
+
+       printf("  [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n"
+              "          default pktio%02i-INPUT queue:%u\n",
+               thr, pktio, pktio, inq_def);
+
+       /* create shared queue between processes*/
+       pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
+       pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
+       if (pktio_ipc == ODP_PKTIO_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio create failed\n", thr);
+               return NULL;
+       }
+       ipcq_def = odp_queue_create("shared-queue",
+                                   ODP_QUEUE_TYPE_IPC, &qparam);
+       if (ipcq_def == ODP_QUEUE_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio queue creation failed\n", thr);
+               return NULL;
+       }
+
+       /* In loop take packets from inq queue and put them to ipc queue */
+       for (;;) {
+               /* Use schedule to get buf from any input queue */
+               printf("waiting for packet...\n");
+               buf = odp_schedule(NULL, ODP_SCHED_WAIT);
+
+               printf("Enqueue the packet to ipc queue size %ld/%ld\n",
+                      (unsigned long)odp_packet_get_len(buf),
+                      (unsigned long)odp_buffer_size(buf));
+
+               odp_queue_enq(ipcq_def, buf);
+       }
+
+/* unreachable */
+}
+
+/**
+ * Packet IO loopback worker thread using bursts from/to IO resources
+ *
+ * @param arg  thread arguments of type 'thread_args_t *'
+ */
+static void *pktio_ifburst_thread(void *arg)
+{
+       int thr;
+       odp_buffer_pool_t pkt_pool;
+       odp_pktio_t pktio;
+       thread_args_t *thr_args;
+       int pkts, pkts_ok;
+       odp_packet_t pkt_tbl[MAX_PKT_BURST];
+       unsigned long pkt_cnt = 0;
+       unsigned long err_cnt = 0;
+       odp_pktio_params_t params;
+       socket_params_t *sock_params = &params.sock_params;
+       int ret;
+
+       odp_pktio_t pktio_ipc;
+       odp_queue_t ipcq_def;
+       char inq_name[ODP_QUEUE_NAME_LEN];
+       odp_queue_param_t qparam;
+       odp_pktio_params_t pktio_ipc_params;
+
+       thr = odp_thread_id();
+       thr_args = arg;
+
+       printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
+              thr_args->pktio_dev);
+
+       /* Lookup the packet pool */
+       pkt_pool = odp_buffer_pool_lookup("packet_pool");
+       if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
+               ODP_ERR("  [%02i] Error: pkt_pool not found\n", thr);
+               return NULL;
+       }
+
+       /* Open a packet IO instance for this thread */
+       sock_params->type = thr_args->type;
+       sock_params->fanout = thr_args->fanout;
+       pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, &params);
+       if (pktio == ODP_PKTIO_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio create failed.\n", thr);
+               return NULL;
+       }
+
+       printf("  [%02i] created pktio:%02i, burst mode\n",
+              thr, pktio);
+
+       pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
+       pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
+       if (pktio_ipc == ODP_PKTIO_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio create failed\n", thr);
+               return NULL;
+       }
+
+       qparam.sched.prio  = ODP_SCHED_PRIO_DEFAULT;
+       qparam.sched.sync  = ODP_SCHED_SYNC_ATOMIC;
+       qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
+       snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
+       inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
+
+       ipcq_def = odp_queue_create("shared-queue",
+                                   ODP_QUEUE_TYPE_IPC, &qparam);
+       if (ipcq_def == ODP_QUEUE_INVALID) {
+               ODP_ERR("  [%02i] Error: pktio queue creation failed\n", thr);
+               return NULL;
+       }
+
+       /* Loop packets */
+       for (;;) {
+               pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST);
+               if (pkts > 0) {
+                       /* Drop packets with errors */
+                       pkts_ok = drop_err_pkts(pkt_tbl, pkts);
+                       if (pkts_ok > 0) {
+                               ret = odp_queue_enq_multi(ipcq_def,
+                                                         pkt_tbl, pkts_ok);
+                               pkt_cnt += pkts_ok;
+                               if (ret != 0) {
+                                       ODP_ERR("odp_ring_mp_enqueue_bulk 
fail\n");
+                               } else {
+                                       printf("[%d/%d] enqueue %d packets, 
first buf %d size %ld/%ld, cnt %lu\n",
+                                              getpid(), thr, pkts_ok,
+                                              pkt_tbl[0],
+                                              (unsigned 
long)odp_packet_get_len(pkt_tbl[0]),
+                                              (unsigned 
long)odp_buffer_size(pkt_tbl[0]),
+                                              pkt_cnt);
+                               }
+                       }
+
+                       if (odp_unlikely(pkts_ok != pkts))
+                               ODP_ERR("Dropped frames:%u - err_cnt:%lu\n",
+                                       pkts-pkts_ok, ++err_cnt);
+               }
+       }
+
+/* unreachable */
+}
+
+/**
+ * ODP packet example main function
+ */
+int main(int argc, char *argv[])
+{
+       odph_linux_pthread_t thread_tbl[MAX_WORKERS];
+       odp_buffer_pool_t pool;
+       int thr_id;
+       int num_workers;
+       void *pool_base;
+       int i;
+       int first_core;
+       int core_count;
+
+       /* Init ODP before calling anything else */
+       if (odp_init_global()) {
+               ODP_ERR("Error: ODP global init failed.\n");
+               exit(EXIT_FAILURE);
+       }
+
+       args = malloc(sizeof(args_t));
+       if (args == NULL) {
+               ODP_ERR("Error: shared mem alloc failed.\n");
+               exit(EXIT_FAILURE);
+       }
+       memset(args, 0, sizeof(*args));
+
+       /* Parse and store the application arguments */
+       parse_args(argc, argv, &args->appl);
+
+       /* Print both system and application information */
+       print_info(NO_PATH(argv[0]), &args->appl);
+
+       core_count  = odp_sys_core_count();
+       num_workers = core_count;
+
+       if (args->appl.core_count)
+               num_workers = args->appl.core_count;
+
+       if (num_workers > MAX_WORKERS)
+               num_workers = MAX_WORKERS;
+
+       printf("Num worker threads: %i\n", num_workers);
+
+       /*
+        * By default core #0 runs Linux kernel background tasks.
+        * Start mapping thread from core #1
+        */
+       first_core = 1;
+
+       if (core_count == 1)
+               first_core = 0;
+
+       printf("First core:         %i\n\n", first_core);
+
+       /* Init this thread */
+       thr_id = odp_thread_create(0);
+       odp_init_local(thr_id);
+
+       /* Create packet pool */
+       pool_base = odp_shm_reserve("shm_packet_pool",
+                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_PROC);
+       if (pool_base == NULL) {
+               ODP_ERR("Error: packet pool mem alloc failed.\n");
+               exit(EXIT_FAILURE);
+       }
+
+       pool = odp_buffer_pool_create("packet_pool", pool_base,
+                                     SHM_PKT_POOL_SIZE,
+                                     SHM_PKT_POOL_BUF_SIZE,
+                                     ODP_CACHE_LINE_SIZE,
+                                     ODP_BUFFER_TYPE_PACKET);
+       if (pool == ODP_BUFFER_POOL_INVALID) {
+               ODP_ERR("Error: packet pool create failed.\n");
+               exit(EXIT_FAILURE);
+       }
+       odp_buffer_pool_print(pool);
+
+
+       /* Create another process */
+       int f = fork();
+
+       /* Create and init worker threads */
+       memset(thread_tbl, 0, sizeof(thread_tbl));
+       for (i = 0; i < num_workers; ++i) {
+               void *(*thr_run_func) (void *);
+               int core;
+               int if_idx;
+
+               core = (first_core + i) % core_count;
+
+               if_idx = i % args->appl.if_count;
+
+               args->thread[i].pktio_dev = args->appl.if_names[if_idx];
+               args->thread[i].pool = pool;
+               args->thread[i].mode = args->appl.mode;
+               args->thread[i].type = args->appl.type;
+               args->thread[i].fanout = args->appl.fanout;
+
+               if (f) {
+                       thr_run_func = ring_thread;
+               } else {
+                       if (args->appl.mode == APPL_MODE_PKT_BURST)
+                               thr_run_func = pktio_ifburst_thread;
+                       else /* APPL_MODE_PKT_QUEUE */
+                               thr_run_func = pktio_queue_thread;
+               }
+               /*
+                * Create threads one-by-one instead of all-at-once,
+                * because each thread might get different arguments.
+                * Calls odp_thread_create(cpu) for each thread
+                */
+               odph_linux_pthread_create(thread_tbl, 1, core, thr_run_func,
+                                         &args->thread[i]);
+       }
+
+       /* Master thread waits for other threads to exit */
+       odph_linux_pthread_join(thread_tbl, num_workers);
+
+       printf("Exit\n\n");
+
+       return 0;
+}
+
+/**
+ * Drop packets which input parsing marked as containing errors.
+ *
+ * Frees packets with error and modifies pkt_tbl[] to only contain packets with
+ * no detected errors.
+ *
+ * @param pkt_tbl  Array of packet
+ * @param len      Length of pkt_tbl[]
+ *
+ * @return Number of packets with no detected error
+ */
+static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len)
+{
+       odp_packet_t pkt;
+       unsigned pkt_cnt = len;
+       unsigned i, j;
+
+       for (i = 0, j = 0; i < len; ++i) {
+               pkt = pkt_tbl[i];
+
+               if (odp_unlikely(odp_packet_error(pkt))) {
+                       odph_packet_free(pkt); /* Drop */
+                       pkt_cnt--;
+               } else if (odp_unlikely(i != j++)) {
+                       pkt_tbl[j-1] = pkt;
+               }
+       }
+
+       return pkt_cnt;
+}
+
+/**
+ * Parse and store the command line arguments
+ *
+ * @param argc       argument count
+ * @param argv[]     argument vector
+ * @param appl_args  Store application arguments here
+ */
+static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
+{
+       int opt;
+       int long_index;
+       char *names, *str, *token, *save;
+       int i;
+       int len;
+       static struct option longopts[] = {
+               {"count", required_argument, NULL, 'c'},
+               {"interface", required_argument, NULL, 'i'},    /* return 'i' */
+               {"mode", required_argument, NULL, 'm'},         /* return 'm' */
+               {"help", no_argument, NULL, 'h'},               /* return 'h' */
+               {NULL, 0, NULL, 0}
+       };
+
+       appl_args->mode = -1; /* Invalid, must be changed by parsing */
+       appl_args->type = 3;  /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */
+       appl_args->fanout = 1; /* turn off fanout by default for mmap */
+
+       while (1) {
+               opt = getopt_long(argc, argv, "+c:i:m:t:f:h",
+                                 longopts, &long_index);
+
+               if (opt == -1)
+                       break;  /* No more options */
+
+               switch (opt) {
+               case 'c':
+                       appl_args->core_count = atoi(optarg);
+                       break;
+                       /* parse packet-io interface names */
+               case 'i':
+                       len = strlen(optarg);
+                       if (len == 0) {
+                               usage(argv[0]);
+                               exit(EXIT_FAILURE);
+                       }
+                       len += 1;       /* add room for '\0' */
+
+                       names = malloc(len);
+                       if (names == NULL) {
+                               usage(argv[0]);
+                               exit(EXIT_FAILURE);
+                       }
+
+                       /* count the number of tokens separated by ',' */
+                       strcpy(names, optarg);
+                       for (str = names, i = 0;; str = NULL, i++) {
+                               token = strtok_r(str, ",", &save);
+                               if (token == NULL)
+                                       break;
+                       }
+                       appl_args->if_count = i;
+
+                       if (appl_args->if_count == 0) {
+                               usage(argv[0]);
+                               exit(EXIT_FAILURE);
+                       }
+
+                       /* allocate storage for the if names */
+                       appl_args->if_names =
+                           calloc(appl_args->if_count, sizeof(char *));
+
+                       /* store the if names (reset names string) */
+                       strcpy(names, optarg);
+                       for (str = names, i = 0;; str = NULL, i++) {
+                               token = strtok_r(str, ",", &save);
+                               if (token == NULL)
+                                       break;
+                               appl_args->if_names[i] = token;
+                       }
+                       break;
+
+               case 'm':
+                       i = atoi(optarg);
+                       if (i == 0)
+                               appl_args->mode = APPL_MODE_PKT_BURST;
+                       else
+                               appl_args->mode = APPL_MODE_PKT_QUEUE;
+                       break;
+
+               case 't':
+                       appl_args->type = atoi(optarg);
+                       break;
+
+               case 'f':
+                       appl_args->fanout = atoi(optarg);
+                       break;
+
+               case 'h':
+                       usage(argv[0]);
+                       exit(EXIT_SUCCESS);
+                       break;
+
+               default:
+                       break;
+               }
+       }
+
+       if (appl_args->if_count == 0 || appl_args->mode == -1) {
+               usage(argv[0]);
+               exit(EXIT_FAILURE);
+       }
+
+       optind = 1;             /* reset 'extern optind' from the getopt lib */
+}
+
+/**
+ * Print system and application info
+ */
+static void print_info(char *progname, appl_args_t *appl_args)
+{
+       int i;
+
+       printf("\n"
+              "ODP system info\n"
+              "---------------\n"
+              "ODP API version: %s\n"
+              "CPU model:       %s\n"
+              "CPU freq (hz):   %"PRIu64"\n"
+              "Cache line size: %i\n"
+              "Core count:      %i\n"
+              "\n",
+              odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(),
+              odp_sys_cache_line_size(), odp_sys_core_count());
+
+       printf("Running ODP appl: \"%s\"\n"
+              "-----------------\n"
+              "IF-count:        %i\n"
+              "Using IFs:      ",
+              progname, appl_args->if_count);
+       for (i = 0; i < appl_args->if_count; ++i)
+               printf(" %s", appl_args->if_names[i]);
+       printf("\n"
+              "Mode:            ");
+       if (appl_args->mode == APPL_MODE_PKT_BURST)
+               PRINT_APPL_MODE(APPL_MODE_PKT_BURST);
+       else
+               PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE);
+       printf("\n\n");
+       fflush(NULL);
+}
+
+/**
+ * Prinf usage information
+ */
+static void usage(char *progname)
+{
+       printf("\n"
+              "Usage: %s OPTIONS\n"
+              "  E.g. %s -i eth1,eth2,eth3 -m 0\n"
+              "\n"
+              "OpenDataPlane example application.\n"
+              "\n"
+              "Mandatory OPTIONS:\n"
+              "  -i, --interface Eth interfaces (comma-separated, no spaces)\n"
+              "  -m, --mode      0: Burst send&receive packets (no queues)\n"
+              "                  1: Send&receive packets through ODP queues.\n"
+              " -t, --type   1: ODP_PKTIO_TYPE_SOCKET_BASIC\n"
+              "              2: ODP_PKTIO_TYPE_SOCKET_MMSG\n"
+              "              3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
+              "              4: ODP_PKTIO_TYPE_NETMAP\n"
+              "         Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
+              " -f, --fanout 0: off 1: on (Default 1: on)\n"
+              "\n"
+              "Optional OPTIONS\n"
+              "  -c, --count <number> Core count.\n"
+              "  -h, --help           Display help and exit.\n"
+              "\n", NO_PATH(progname), NO_PATH(progname)
+           );
+}
diff --git a/example/l2fwd/odp_l2fwd.c b/example/l2fwd/odp_l2fwd.c
index 6a38c1b..8885eab 100644
--- a/example/l2fwd/odp_l2fwd.c
+++ b/example/l2fwd/odp_l2fwd.c
@@ -329,7 +329,8 @@ int main(int argc, char *argv[])
        }
 
        /* Reserve memory for args from shared mem */
-       gbl_args = odp_shm_reserve("shm_args", sizeof(args_t), 
ODP_CACHE_LINE_SIZE);
+       gbl_args = odp_shm_reserve("shm_args", sizeof(args_t),
+                                ODP_CACHE_LINE_SIZE, ODP_SHM_THREAD);
        if (gbl_args == NULL) {
                ODP_ERR("Error: shared mem alloc failed.\n");
                exit(EXIT_FAILURE);
@@ -380,7 +381,8 @@ int main(int argc, char *argv[])
 
        /* Create packet pool */
        pool_base = odp_shm_reserve("shm_packet_pool",
-                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
        if (pool_base == NULL) {
                ODP_ERR("Error: packet pool mem alloc failed.\n");
                exit(EXIT_FAILURE);
diff --git a/example/odp_example/odp_example.c 
b/example/odp_example/odp_example.c
index 40d237b..f680df0 100644
--- a/example/odp_example/odp_example.c
+++ b/example/odp_example/odp_example.c
@@ -1004,7 +1004,8 @@ int main(int argc, char *argv[])
         * Create message pool
         */
        pool_base = odp_shm_reserve("msg_pool",
-                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
 
        pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
                                      sizeof(test_message_t),
diff --git a/example/packet/odp_pktio.c b/example/packet/odp_pktio.c
index 88ce575..c397936 100644
--- a/example/packet/odp_pktio.c
+++ b/example/packet/odp_pktio.c
@@ -317,7 +317,8 @@ int main(int argc, char *argv[])
        }
 
        /* Reserve memory for args from shared mem */
-       args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE);
+       args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE,
+                               ODP_SHM_THREAD);
        if (args == NULL) {
                ODP_ERR("Error: shared mem alloc failed.\n");
                exit(EXIT_FAILURE);
@@ -358,7 +359,8 @@ int main(int argc, char *argv[])
 
        /* Create packet pool */
        pool_base = odp_shm_reserve("shm_packet_pool",
-                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
        if (pool_base == NULL) {
                ODP_ERR("Error: packet pool mem alloc failed.\n");
                exit(EXIT_FAILURE);
diff --git a/example/timer/odp_timer_test.c b/example/timer/odp_timer_test.c
index 4168856..8a2d566 100644
--- a/example/timer/odp_timer_test.c
+++ b/example/timer/odp_timer_test.c
@@ -315,7 +315,8 @@ int main(int argc, char *argv[])
         * Create message pool
         */
        pool_base = odp_shm_reserve("msg_pool",
-                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
 
        pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
                                      0,
diff --git a/helper/include/odph_ring.h b/helper/include/odph_ring.h
index 76c1db8..1d81b5f 100644
--- a/helper/include/odph_ring.h
+++ b/helper/include/odph_ring.h
@@ -158,6 +158,8 @@ typedef struct odph_ring {
 
 #define ODPH_RING_F_SP_ENQ 0x0001 /* The default enqueue is 
"single-producer".*/
 #define ODPH_RING_F_SC_DEQ 0x0002 /* The default dequeue is 
"single-consumer".*/
+#define ODPH_RING_SHM_PROC 0x0004 /* If set - ring is visible from different
+                                   processes. Default is thread visible.     */
 #define ODPH_RING_QUOT_EXCEED (1 << 31)  /* Quota exceed for burst ops */
 #define ODPH_RING_SZ_MASK  (unsigned)(0x0fffffff) /* Ring size mask */
 
diff --git a/platform/linux-generic/include/api/odp_pktio_types.h 
b/platform/linux-generic/include/api/odp_pktio_types.h
index 54ce459..79c0f48 100644
--- a/platform/linux-generic/include/api/odp_pktio_types.h
+++ b/platform/linux-generic/include/api/odp_pktio_types.h
@@ -30,6 +30,7 @@ typedef enum {
        ODP_PKTIO_TYPE_SOCKET_MMSG,
        ODP_PKTIO_TYPE_SOCKET_MMAP,
        ODP_PKTIO_TYPE_NETMAP,
+       ODP_PKTIO_TYPE_IPC,
 } odp_pktio_type_t;
 
 #include <odp_pktio_socket.h>
diff --git a/platform/linux-generic/include/api/odp_queue.h 
b/platform/linux-generic/include/api/odp_queue.h
index 5e083f1..4700a62 100644
--- a/platform/linux-generic/include/api/odp_queue.h
+++ b/platform/linux-generic/include/api/odp_queue.h
@@ -44,6 +44,8 @@ typedef int odp_queue_type_t;
 #define ODP_QUEUE_TYPE_POLL   1  /**< Not scheduled queue */
 #define ODP_QUEUE_TYPE_PKTIN  2  /**< Packet input queue */
 #define ODP_QUEUE_TYPE_PKTOUT 3  /**< Packet output queue */
+#define ODP_QUEUE_TYPE_IPC    4  /**< Packet ipc queue */
+#define ODP_QUEUE_TYPE_IPC_LOOKUP    5  /**< Packet ipc queue */
 
 /**
  * ODP schedule priority
diff --git a/platform/linux-generic/include/api/odp_shared_memory.h 
b/platform/linux-generic/include/api/odp_shared_memory.h
index 8ac8847..1d8e8bb 100644
--- a/platform/linux-generic/include/api/odp_shared_memory.h
+++ b/platform/linux-generic/include/api/odp_shared_memory.h
@@ -24,6 +24,13 @@ extern "C" {
 /** Maximum shared memory block name lenght in chars */
 #define ODP_SHM_NAME_LEN 32
 
+typedef enum {
+       ODP_SHM_THREAD = 1,        /**< Memory accessible by threads.  */
+       ODP_SHM_PROC = 2,          /**< Memory accessible by processes.
+                                   Will be created if not exist.  */
+       ODP_SHM_PROC_NOCREAT = 3,  /**< Memory accessible by processes.
+                                   Has to be created before usage.*/
+} odp_shm_e;
 
 /**
  * Reserve a block of shared memory
@@ -31,10 +38,12 @@ extern "C" {
  * @param name   Name of the block (maximum ODP_SHM_NAME_LEN - 1 chars)
  * @param size   Block size in bytes
  * @param align  Block alignment in bytes
+ * @param flag   Flags for shared memory creation
  *
  * @return Pointer to the reserved block, or NULL
  */
-void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align);
+void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
+                     odp_shm_e flag);
 
 /**
  * Lookup for a block of shared memory
@@ -44,6 +53,7 @@ void *odp_shm_reserve(const char *name, uint64_t size, 
uint64_t align);
  * @return Pointer to the block, or NULL
  */
 void *odp_shm_lookup(const char *name);
+int odp_shm_lookup_ipc(const char *name);
 
 
 /**
diff --git a/platform/linux-generic/include/odp_packet_io_internal.h 
b/platform/linux-generic/include/odp_packet_io_internal.h
index 881cc5f..77fff96 100644
--- a/platform/linux-generic/include/odp_packet_io_internal.h
+++ b/platform/linux-generic/include/odp_packet_io_internal.h
@@ -35,6 +35,7 @@ struct pktio_entry {
 #ifdef ODP_HAVE_NETMAP
        pkt_netmap_t pkt_nm;            /**< using netmap API for IO */
 #endif
+       odp_buffer_pool_t pool;         /**< reference to packet pool */
 };
 
 typedef union {
diff --git a/platform/linux-generic/include/odp_queue_internal.h 
b/platform/linux-generic/include/odp_queue_internal.h
index 8b6c517..077fafb 100644
--- a/platform/linux-generic/include/odp_queue_internal.h
+++ b/platform/linux-generic/include/odp_queue_internal.h
@@ -23,6 +23,7 @@ extern "C" {
 #include <odp_packet_io.h>
 #include <odp_align.h>
 
+#include <odph_ring.h>
 
 #define USE_TICKETLOCK
 
@@ -39,6 +40,9 @@ extern "C" {
 #define QUEUE_STATUS_NOTSCHED 2
 #define QUEUE_STATUS_SCHED    3
 
+#define QUEUE_IPC_ENTRIES     4096 /**< number of odp buffers in
+                                       odp ring queue */
+
 /* forward declaration */
 union queue_entry_u;
 
@@ -65,13 +69,13 @@ struct queue_entry_s {
        deq_func_t       dequeue;
        enq_multi_func_t enqueue_multi;
        deq_multi_func_t dequeue_multi;
-
        odp_queue_t       handle;
        odp_buffer_t      sched_buf;
        odp_queue_type_t  type;
        odp_queue_param_t param;
        odp_pktio_t       pktin;
        odp_pktio_t       pktout;
+       odph_ring_t       *r;    /* odph_ring ref for ipc queue */
        char              name[ODP_QUEUE_NAME_LEN];
 };
 
@@ -84,10 +88,18 @@ typedef union queue_entry_u {
 queue_entry_t *get_qentry(uint32_t queue_id);
 
 int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
+
 odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
+odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue);
 
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
+int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                       int num);
+
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int 
num);
+int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
+                       int num);
 
 void queue_lock(queue_entry_t *queue);
 void queue_unlock(queue_entry_t *queue);
diff --git a/platform/linux-generic/odp_buffer_pool.c 
b/platform/linux-generic/odp_buffer_pool.c
index e538f04..390e3a6 100644
--- a/platform/linux-generic/odp_buffer_pool.c
+++ b/platform/linux-generic/odp_buffer_pool.c
@@ -117,7 +117,8 @@ int odp_buffer_pool_init_global(void)
 
        pool_tbl = odp_shm_reserve("odp_buffer_pools",
                                   sizeof(pool_table_t),
-                                  sizeof(pool_entry_t));
+                                  sizeof(pool_entry_t),
+                                  ODP_SHM_THREAD);
 
        if (pool_tbl == NULL)
                return -1;
diff --git a/platform/linux-generic/odp_crypto.c 
b/platform/linux-generic/odp_crypto.c
index fae546c..30904b1 100644
--- a/platform/linux-generic/odp_crypto.c
+++ b/platform/linux-generic/odp_crypto.c
@@ -409,7 +409,8 @@ odp_crypto_init_global(void)
        mem_size += (MAX_SESSIONS * sizeof(odp_crypto_generic_session_t));
 
        /* Allocate our globally shared memory */
-       global = odp_shm_reserve("crypto_pool", mem_size, ODP_CACHE_LINE_SIZE);
+       global = odp_shm_reserve("crypto_pool", mem_size, ODP_CACHE_LINE_SIZE,
+                                ODP_SHM_THREAD);
 
        /* Clear it out */
        memset(global, 0, mem_size);
diff --git a/platform/linux-generic/odp_packet_io.c 
b/platform/linux-generic/odp_packet_io.c
index 33ade10..00d7bc9 100644
--- a/platform/linux-generic/odp_packet_io.c
+++ b/platform/linux-generic/odp_packet_io.c
@@ -55,7 +55,8 @@ int odp_pktio_init_global(void)
 
        pktio_tbl = odp_shm_reserve("odp_pktio_entries",
                                    sizeof(pktio_table_t),
-                                   sizeof(pktio_entry_t));
+                                   sizeof(pktio_entry_t),
+                                   ODP_SHM_THREAD);
        if (pktio_tbl == NULL)
                return -1;
 
@@ -129,6 +130,8 @@ static void init_pktio_entry(pktio_entry_t *entry, 
odp_pktio_params_t *params)
                memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm));
                break;
 #endif
+       case ODP_PKTIO_TYPE_IPC:
+               break;
        default:
                ODP_ERR("Packet I/O type not supported. Please recompile\n");
                break;
@@ -194,6 +197,8 @@ odp_pktio_t odp_pktio_open(const char *dev, 
odp_buffer_pool_t pool,
                ODP_DBG("Allocating netmap pktio\n");
                break;
 #endif
+       case ODP_PKTIO_TYPE_IPC:
+               break;
        default:
                ODP_ERR("Invalid pktio type: %02x\n", params->type);
                return ODP_PKTIO_INVALID;
@@ -239,6 +244,9 @@ odp_pktio_t odp_pktio_open(const char *dev, 
odp_buffer_pool_t pool,
                }
                break;
 #endif
+       case ODP_PKTIO_TYPE_IPC:
+               pktio_entry->s.pool = pool;
+               break;
        default:
                free_pktio_entry(id);
                id = ODP_PKTIO_INVALID;
@@ -381,11 +389,22 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t 
queue)
        pktio_entry_t *pktio_entry = get_entry(id);
        queue_entry_t *qentry = queue_to_qentry(queue);
 
-       if (pktio_entry == NULL || qentry == NULL)
+       if (pktio_entry == NULL || qentry == NULL) {
+               ODP_ERR("%s() return -q reason %p -- %p\n",
+                       __func__,
+                       pktio_entry, qentry);
                return -1;
+       }
 
-       if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN)
+       switch (qentry->s.type) {
+       case ODP_QUEUE_TYPE_PKTIN:
+       case ODP_QUEUE_TYPE_IPC:
+       case ODP_QUEUE_TYPE_IPC_LOOKUP:
+               break;
+       default:
+               ODP_ERR("%s() type is %d\n", __func__, qentry->s.type);
                return -1;
+       }
 
        lock_entry(pktio_entry);
        pktio_entry->s.inq_default = queue;
@@ -396,6 +415,12 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t queue)
        qentry->s.status = QUEUE_STATUS_SCHED;
        queue_unlock(qentry);
 
+       if (qentry->s.type == ODP_QUEUE_TYPE_IPC)
+               return 0;
+       if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP)
+               return 0;
+
+
        odp_schedule_queue(queue, qentry->s.param.sched.prio);
 
        return 0;
diff --git a/platform/linux-generic/odp_queue.c 
b/platform/linux-generic/odp_queue.c
index c637bdf..bb8b9b6 100644
--- a/platform/linux-generic/odp_queue.c
+++ b/platform/linux-generic/odp_queue.c
@@ -21,6 +21,11 @@
 #include <odp_hints.h>
 #include <odp_sync.h>
 
+#include <odph_ring.h>
+
+#include <sys/types.h>
+#include <unistd.h>
+
 #ifdef USE_TICKETLOCK
 #include <odp_ticketlock.h>
 #define LOCK(a)      odp_ticketlock_lock(a)
@@ -34,7 +39,7 @@
 #endif
 
 #include <string.h>
-
+#include <stdlib.h>
 
 typedef struct queue_table_t {
        queue_entry_t  queue[ODP_CONFIG_QUEUES];
@@ -77,6 +82,35 @@ static void queue_init(queue_entry_t *queue, const char 
*name,
                queue->s.enqueue_multi = pktout_enq_multi;
                queue->s.dequeue_multi = pktout_deq_multi;
                break;
+       case ODP_QUEUE_TYPE_IPC:
+               queue->s.r = odph_ring_lookup(name);
+               if (!queue->s.r) {
+                       queue->s.r = odph_ring_create(name,
+                                       QUEUE_IPC_ENTRIES,
+                                       ODPH_RING_SHM_PROC);
+                       if (queue->s.r == NULL)
+                               ODP_ERR("ring create failed\n");
+               }
+               queue->s.enqueue = queue_enq_ipc;
+               queue->s.dequeue = queue_deq_ipc;
+               queue->s.enqueue_multi = queue_enq_multi_ipc;
+               queue->s.dequeue_multi = queue_deq_multi_ipc;
+               break;
+       case ODP_QUEUE_TYPE_IPC_LOOKUP:
+               if (odp_shm_lookup_ipc(name) == 1) {
+                       size_t ring_size =  QUEUE_IPC_ENTRIES * sizeof(void *)
+                                       + sizeof(odph_ring_t);
+                       queue->s.r = odp_shm_reserve(name, ring_size,
+                                       ODP_CACHE_LINE_SIZE,
+                                       ODP_SHM_PROC_NOCREAT);
+                       if (queue->s.r == NULL)
+                               ODP_ERR("LOOKUP ring create failed\n");
+               }
+               queue->s.enqueue = queue_enq_ipc;
+               queue->s.dequeue = queue_deq_ipc;
+               queue->s.enqueue_multi = queue_enq_multi_ipc;
+               queue->s.dequeue_multi = queue_deq_multi_ipc;
+               break;
        default:
                queue->s.enqueue = queue_enq;
                queue->s.dequeue = queue_deq;
@@ -99,7 +133,8 @@ int odp_queue_init_global(void)
 
        queue_tbl = odp_shm_reserve("odp_queues",
                                    sizeof(queue_table_t),
-                                   sizeof(queue_entry_t));
+                                   sizeof(queue_entry_t),
+                                   ODP_SHM_THREAD);
 
        if (queue_tbl == NULL)
                return -1;
@@ -113,6 +148,11 @@ int odp_queue_init_global(void)
                queue->s.handle = queue_from_id(i);
        }
 
+       /* for linux-generic IPC queue implemented totaly in
+        * software using odp_ring.
+        */
+       odph_ring_tailq_init();
+
        ODP_DBG("done\n");
        ODP_DBG("Queue init global\n");
        ODP_DBG("  struct queue_entry_s size %zu\n",
@@ -243,6 +283,27 @@ odp_queue_t odp_queue_lookup(const char *name)
                UNLOCK(&queue->s.lock);
        }
 
+       /* do look up for shared memory object if exist return that queue*/
+       odph_ring_t *r;
+
+       r = odph_ring_lookup(name);
+       if (r == NULL) {
+               if (odp_shm_lookup_ipc(name) == 1) {
+                       /* Create local IPC queue connected to shm object */
+                       odp_queue_t q = odp_queue_create(name,
+                                               ODP_QUEUE_TYPE_IPC_LOOKUP,
+                                               NULL);
+                       if (q != ODP_QUEUE_INVALID)
+                               return q;
+               }
+       } else {
+               /* odp ring is in odp_ring_list. That means current process
+                * already created link with such name. That might be ipc
+                * queue or ring itself. For now print error here.
+                */
+               ODP_ERR("odp ring with name: \"%s\" already initialized\n", 
name);
+       }
+
        return ODP_QUEUE_INVALID;
 }
 
@@ -276,6 +337,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr)
        return 0;
 }
 
+int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
+{
+       int ret;
+       odph_ring_t *r = queue->s.r;
+       odp_buffer_bits_t handle;
+       uint32_t index = buf_hdr->handle.index;
+       uint32_t pool_id = buf_hdr->handle.pool_id;
+       odp_buffer_t buf;
+       void **rbuf_p;
+
+       /* get buffer from buf_hdr */
+       handle.index = index;
+       handle.pool_id = pool_id;
+
+       buf =  handle.u32;
+
+       rbuf_p = (void *)&buf;
+       /* use odp_ring locks instead of per process queue lock
+        * LOCK(&queue->s.lock);
+        */
+       /* queue buffer to the ring. Note: we can't use pointer to buf_hdr
+        * here due to poiter will be referenced in different porocess
+        */
+       ret = odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
+       if (ret != 0)
+               ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
+       /*
+        * UNLOCK(&queue->s.lock);
+        */
+       return 0;
+}
+
 
 int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
@@ -311,6 +404,45 @@ int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t 
*buf_hdr[], int num)
        return 0;
 }
 
+int queue_enq_multi_ipc(queue_entry_t *queue,
+                       odp_buffer_hdr_t *buf_hdr[], int num)
+{
+       int i;
+       int ret = 0;
+       odph_ring_t *r = queue->s.r;
+       odp_buffer_bits_t handle;
+       odp_buffer_t buf;
+       void **rbuf_p;
+
+       /* use odp_ring locks instead of per process queue lock
+        * LOCK(&queue->s.lock);
+        */
+
+       /* odp_buffer_t buffers can be in not continius memory,
+        * so queue them to IPC ring one by one.
+        */
+       for (i = 0; i < num; i++) {
+               handle.index = buf_hdr[i]->handle.index;
+               handle.pool_id =  buf_hdr[i]->handle.pool_id;
+
+               buf =  handle.u32;
+
+               rbuf_p = (void *)&buf;
+
+               /* queue buffer to the ring. Note: we can't use pointer
+                * to buf_hdr here due to poiter will be referenced in
+                * different porocess.
+                */
+               ret += odph_ring_mp_enqueue_bulk(r, rbuf_p, 1);
+               if (ret != 0)
+                       ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
+       }
+       /*
+        * UNLOCK(&queue->s.lock);
+        */
+
+       return ret;
+}
 
 int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num)
 {
@@ -369,6 +501,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
        return buf_hdr;
 }
 
+odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue)
+{
+       odp_buffer_hdr_t *buf_hdr = NULL;
+       odph_ring_t *r = queue->s.r;
+       int ret;
+       odp_buffer_t buf;
+       void **buf_p = (void *)&buf;
+
+       /* using odp_ring lock
+        * LOCK(&queue->s.lock);
+        */
+       ret = odph_ring_mc_dequeue_bulk(r, buf_p, 1);
+       if (ret == 0)
+               buf_hdr = odp_buf_to_hdr(buf);
+       /*
+        * UNLOCK(&queue->s.lock);
+        */
+
+       return buf_hdr;
+}
+
+int queue_deq_multi_ipc(queue_entry_t *queue,
+                       odp_buffer_hdr_t *buf_hdr[], int num)
+{
+       int i = 0;
+       odph_ring_t *r = queue->s.r;
+       int ret;
+       odp_buffer_t buf;
+       odp_buffer_t ipcbufs[QUEUE_IPC_ENTRIES];
+       void **ipcbufs_p = (void *)&ipcbufs;
+
+       /* use odp ring lock
+        * LOCK(&queue->s.lock);
+        */
+
+       if (queue->s.head == NULL) {
+               /* Already empty queue */
+       } else {
+               odp_buffer_hdr_t *hdr = queue->s.head;
+
+               ret = odph_ring_mc_dequeue_bulk(r, ipcbufs_p, num);
+               if (ret == 0) {
+                       for (; i < num && hdr; i++) {
+                               memcpy(&buf, (void *)ipcbufs_p[i],
+                                      sizeof(odp_buffer_t));
+
+                               buf_hdr[i] = odp_buf_to_hdr(buf);
+                               hdr              = hdr->next;
+                               buf_hdr[i]->next = NULL;
+                       }
+               }
+
+               queue->s.head = hdr;
+
+               if (hdr == NULL) {
+                       /* Queue is now empty */
+                       queue->s.tail = NULL;
+               }
+       }
+
+       /* use odp_ring lock
+        * UNLOCK(&queue->s.lock);
+        */
+
+       return i;
+}
 
 int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int num)
 {
diff --git a/platform/linux-generic/odp_ring.c 
b/platform/linux-generic/odp_ring.c
index 720d8db..e4e3f23 100644
--- a/platform/linux-generic/odp_ring.c
+++ b/platform/linux-generic/odp_ring.c
@@ -158,6 +158,12 @@ odph_ring_create(const char *name, unsigned count, 
unsigned flags)
        char ring_name[ODPH_RING_NAMESIZE];
        odph_ring_t *r;
        size_t ring_size;
+       odp_shm_e shm_flag;
+
+       if (flags & ODPH_RING_SHM_PROC)
+               shm_flag = ODP_SHM_PROC;
+       else
+               shm_flag = ODP_SHM_THREAD;
 
        /* count must be a power of 2 */
        if (!ODP_VAL_IS_POWER_2(count) || (count > ODPH_RING_SZ_MASK)) {
@@ -171,7 +177,8 @@ odph_ring_create(const char *name, unsigned count, unsigned 
flags)
 
        odp_rwlock_write_lock(&qlock);
        /* reserve a memory zone for this ring.*/
-       r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE);
+       r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE,
+                           shm_flag);
 
        if (r != NULL) {
                /* init the ring structure */
@@ -549,7 +556,7 @@ void odph_ring_list_dump(void)
 /* search a ring from its name */
 odph_ring_t *odph_ring_lookup(const char *name)
 {
-       odph_ring_t *r = odp_shm_lookup(name);
+       odph_ring_t *r;
 
        odp_rwlock_read_lock(&qlock);
        TAILQ_FOREACH(r, &odp_ring_list, next) {
diff --git a/platform/linux-generic/odp_schedule.c 
b/platform/linux-generic/odp_schedule.c
index 9e399f1..9e76692 100644
--- a/platform/linux-generic/odp_schedule.c
+++ b/platform/linux-generic/odp_schedule.c
@@ -89,7 +89,8 @@ int odp_schedule_init_global(void)
 
        sched = odp_shm_reserve("odp_scheduler",
                                sizeof(sched_t),
-                               ODP_CACHE_LINE_SIZE);
+                               ODP_CACHE_LINE_SIZE,
+                               ODP_SHM_THREAD);
 
        if (sched == NULL) {
                ODP_ERR("Schedule init: Shm reserve failed.\n");
@@ -98,7 +99,8 @@ int odp_schedule_init_global(void)
 
 
        pool_base = odp_shm_reserve("odp_sched_pool",
-                                   SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
 
        pool = odp_buffer_pool_create("odp_sched_pool", pool_base,
                                      SCHED_POOL_SIZE, sizeof(queue_desc_t),
diff --git a/platform/linux-generic/odp_shared_memory.c 
b/platform/linux-generic/odp_shared_memory.c
index 784f42b..fc75f60 100644
--- a/platform/linux-generic/odp_shared_memory.c
+++ b/platform/linux-generic/odp_shared_memory.c
@@ -14,10 +14,14 @@
 #include <sys/mman.h>
 #include <asm/mman.h>
 #include <fcntl.h>
+#include <unistd.h>
+#include <sys/types.h>
 
 #include <stdio.h>
 #include <string.h>
 
+#include <odph_ring.h>
+#include <stdlib.h>
 
 #define ODP_SHM_NUM_BLOCKS 32
 
@@ -59,9 +63,8 @@ int odp_shm_init_global(void)
        ODP_DBG("NOTE: mmap does not support huge pages\n");
 #endif
 
-       addr = mmap(NULL, sizeof(odp_shm_table_t),
-                   PROT_READ | PROT_WRITE, SHM_FLAGS, -1, 0);
-
+       /* malloc instead of mmap to bind table to process. */
+       addr = malloc(sizeof(odp_shm_table_t));
        if (addr == MAP_FAILED)
                return -1;
 
@@ -95,9 +98,12 @@ static int find_block(const char *name)
 }
 
 
-void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align)
+void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
+                     odp_shm_e flag)
 {
-       int i;
+       int i, ret, shm_open_flags;
+       int shm = -1;
+       int mmap_flags = MAP_SHARED;
        odp_shm_block_t *block;
        void *addr;
 #ifdef MAP_HUGETLB
@@ -109,6 +115,15 @@ void *odp_shm_reserve(const char *name, uint64_t size, 
uint64_t align)
 
        odp_spinlock_lock(&odp_shm_tbl->lock);
 
+       /* if object was already created return it's address */
+       if (flag == ODP_SHM_PROC_NOCREAT) {
+               for (i = 0; i < ODP_SHM_NUM_BLOCKS; i++) {
+                       if (strcmp(name, odp_shm_tbl->block[i].name) == 0) {
+                               return odp_shm_tbl->block[i].addr;
+                       }
+               }
+       }
+
        if (find_block(name) >= 0) {
                /* Found a block with the same name */
                odp_spinlock_unlock(&odp_shm_tbl->lock);
@@ -123,8 +138,8 @@ void *odp_shm_reserve(const char *name, uint64_t size, 
uint64_t align)
        }
 
        if (i > ODP_SHM_NUM_BLOCKS - 1) {
-               /* Table full */
                odp_spinlock_unlock(&odp_shm_tbl->lock);
+               ODP_ERR("ODP_SHM_NUM_BLOCKS table is full");
                return NULL;
        }
 
@@ -133,19 +148,42 @@ void *odp_shm_reserve(const char *name, uint64_t size, 
uint64_t align)
        addr        = MAP_FAILED;
        block->huge = 0;
 
+       if (flag != ODP_SHM_THREAD) {
+               shm_open_flags = O_RDWR;
+               if (flag == ODP_SHM_PROC)
+                       shm_open_flags |= O_CREAT;
+
+               shm = shm_open(name, shm_open_flags, S_IRUSR | S_IWUSR);
+               if (shm == -1) {
+                       odp_spinlock_unlock(&odp_shm_tbl->lock);
+                       ODP_ERR("shm_open failed");
+                       return NULL;
+               }
+
+               ret = ftruncate(shm, size + align);
+               if (ret == -1) {
+                       odp_spinlock_unlock(&odp_shm_tbl->lock);
+                       if (flag != ODP_SHM_PROC_NOCREAT)
+                               shm_unlink(name);
+                       ODP_ERR("ftruncate failed");
+                       return NULL;
+               }
+       } else {
+               mmap_flags |= MAP_ANONYMOUS;
+       }
+
 #ifdef MAP_HUGETLB
        /* Try first huge pages */
        if (huge_sz && (size + align) > page_sz) {
                addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE,
-                           SHM_FLAGS | MAP_HUGETLB, -1, 0);
+                           mmap_flags | MAP_HUGETLB, shm, 0);
        }
 #endif
 
        /* Use normal pages for small or failed huge page allocations */
        if (addr == MAP_FAILED) {
                addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE,
-                           SHM_FLAGS, -1, 0);
-
+                           mmap_flags, shm, 0);
        } else {
                block->huge = 1;
        }
@@ -153,6 +191,9 @@ void *odp_shm_reserve(const char *name, uint64_t size, 
uint64_t align)
        if (addr == MAP_FAILED) {
                /* Alloc failed */
                odp_spinlock_unlock(&odp_shm_tbl->lock);
+               if (flag != ODP_SHM_PROC_NOCREAT)
+                       shm_unlink(name);
+               ODP_ERR("MAP_FAILED\n");
                return NULL;
        }
 
@@ -192,6 +233,17 @@ void *odp_shm_lookup(const char *name)
        return addr;
 }
 
+int odp_shm_lookup_ipc(const char *name)
+{
+       int shm;
+
+       shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
+       if (shm == -1)
+               return 0;
+
+       close(shm);
+       return 1;
+}
 
 void odp_shm_print_all(void)
 {
diff --git a/test/api_test/odp_shm_test.c b/test/api_test/odp_shm_test.c
index 318d662..fc448a4 100644
--- a/test/api_test/odp_shm_test.c
+++ b/test/api_test/odp_shm_test.c
@@ -47,7 +47,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
        odp_print_system_info();
 
        test_shared_data = odp_shm_reserve("test_shared_data",
-                                          sizeof(test_shared_data_t), 128);
+                                          sizeof(test_shared_data_t), 128,
+                                          ODP_SHM_THREAD);
        memset(test_shared_data, 0, sizeof(test_shared_data_t));
        printf("test shared data at %p\n\n", test_shared_data);
 
diff --git a/test/api_test/odp_timer_ping.c b/test/api_test/odp_timer_ping.c
index 6ba30d3..907a28d 100644
--- a/test/api_test/odp_timer_ping.c
+++ b/test/api_test/odp_timer_ping.c
@@ -328,7 +328,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
         * Create message pool
         */
        pool_base = odp_shm_reserve("msg_pool",
-                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
+                                   MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
+                                   ODP_SHM_THREAD);
 
        pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
                                      BUF_SIZE,
-- 
1.8.5.1.163.gd7aced9


_______________________________________________
lng-odp mailing list
[email protected]
http://lists.linaro.org/mailman/listinfo/lng-odp

Reply via email to