On 2014-09-02 18:30, Maxim Uvarov wrote:
> Implement odp implementation for linux-generic using standard
> odp queue API.
>
> Signed-off-by: Maxim Uvarov <[email protected]>
> ---
>
> New version of IPC queues.
>
> TODO before commit:
> merge this patch with Petri's change to odp_shm_reserve().
>
> Maxim.
>
>
> .gitignore | 5 +
> configure.ac | 1 +
> example/Makefile.am | 2 +-
> example/generator/odp_generator.c | 6 +-
> example/ipc/Makefile.am | 6 +
> example/ipc/README | 46 ++
> example/ipc/odp_ipc.c | 685
> +++++++++++++++++++++
> example/l2fwd/odp_l2fwd.c | 6 +-
> example/odp_example/odp_example.c | 3 +-
> example/packet/odp_pktio.c | 6 +-
> example/timer/odp_timer_test.c | 3 +-
> include/helper/odp_ring.h | 2 +
> include/odp_queue.h | 2 +
> include/odp_shared_memory.h | 12 +-
> .../linux-generic/include/api/odp_pktio_types.h | 1 +
> .../linux-generic/include/odp_packet_io_internal.h | 1 +
> .../linux-generic/include/odp_queue_internal.h | 12 +-
> platform/linux-generic/odp_buffer_pool.c | 3 +-
> platform/linux-generic/odp_packet_io.c | 32 +-
> platform/linux-generic/odp_queue.c | 205 +++++-
> platform/linux-generic/odp_ring.c | 20 +-
> platform/linux-generic/odp_schedule.c | 6 +-
> platform/linux-generic/odp_shared_memory.c | 73 ++-
> test/api_test/odp_shm_test.c | 3 +-
> test/api_test/odp_timer_ping.c | 3 +-
> 25 files changed, 1110 insertions(+), 34 deletions(-)
> create mode 100644 example/ipc/Makefile.am
> create mode 100644 example/ipc/README
> create mode 100644 example/ipc/odp_ipc.c
>
> diff --git a/.gitignore b/.gitignore
> index 39c8d77..7eca389 100644
> --- a/.gitignore
> +++ b/.gitignore
> @@ -5,11 +5,15 @@
> *.patch
> *~
> *.lo
> +*.swp
> +*.swo
> +.dirstamp
Doesn't belong in this commit.
> Makefile
> Makefile.in
> aclocal.m4
> autom4te.cache/
> compile
> +core
> config.guess
> config.sub
> configure
> @@ -32,6 +36,7 @@ lib/
> obj/
> build/
> odp_example
> +odp_ipc
> odp_packet
> odp_packet_netmap
> odp_atomic
> diff --git a/configure.ac b/configure.ac
> index 6b75e66..4f4a913 100644
> --- a/configure.ac
> +++ b/configure.ac
> @@ -119,6 +119,7 @@ AC_CONFIG_FILES([Makefile
> platform/linux-keystone2/Makefile
> platform/linux-dpdk/Makefile
> example/Makefile
> + example/ipc/Makefile
> example/generator/Makefile
> example/l2fwd/Makefile
> example/odp_example/Makefile
> diff --git a/example/Makefile.am b/example/Makefile.am
> index 01a3305..1a5a138 100644
> --- a/example/Makefile.am
> +++ b/example/Makefile.am
> @@ -1 +1 @@
> -SUBDIRS = generator l2fwd odp_example packet packet_netmap timer
> +SUBDIRS = generator l2fwd odp_example packet packet_netmap timer ipc
> diff --git a/example/generator/odp_generator.c
> b/example/generator/odp_generator.c
> index b10372e..102102f 100644
> --- a/example/generator/odp_generator.c
> +++ b/example/generator/odp_generator.c
> @@ -542,7 +542,8 @@ int main(int argc, char *argv[])
> odp_atomic_init_u64(&counters.icmp);
>
> /* Reserve memory for args from shared mem */
> - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE);
> + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
> if (args == NULL) {
> ODP_ERR("Error: shared mem alloc failed.\n");
> exit(EXIT_FAILURE);
> @@ -587,7 +588,8 @@ int main(int argc, char *argv[])
>
> /* Create packet pool */
> pool_base = odp_shm_reserve("shm_packet_pool",
> - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
> if (pool_base == NULL) {
> ODP_ERR("Error: packet pool mem alloc failed.\n");
> exit(EXIT_FAILURE);
> diff --git a/example/ipc/Makefile.am b/example/ipc/Makefile.am
> new file mode 100644
> index 0000000..2fd48f7
> --- /dev/null
> +++ b/example/ipc/Makefile.am
> @@ -0,0 +1,6 @@
> +include $(top_srcdir)/example/Makefile.inc
> +
> +bin_PROGRAMS = odp_ipc
> +odp_ipc_LDFLAGS = $(AM_LDFLAGS) -static
> +
> +dist_odp_ipc_SOURCES = odp_ipc.c
> diff --git a/example/ipc/README b/example/ipc/README
> new file mode 100644
> index 0000000..34f56d7
> --- /dev/null
> +++ b/example/ipc/README
> @@ -0,0 +1,46 @@
> + ODP IPC example
> +
> +This example shows how to use queues to excahnge packets between different
> +processes.
> +
> +Example burst mode:
> +./odp_fork -i eth0 -m 1 -c 1
> +On remote host run ping.
ping what?
I guess the the target that runs odp_fork if so please write that.
Can you add some ascii art?
> +
> +[11492/1] enqueue 1 packets, first buf 7921 size 98/1856, cnt 1
> +11490 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +[11492/1] enqueue 1 packets, first buf 7905 size 98/1856, cnt 2
> +11490 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +[11492/1] enqueue 1 packets, first buf 7889 size 98/1856, cnt 3
> +11490 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +[11492/1] enqueue 1 packets, first buf 7873 size 98/1856, cnt 4
> +
> +
> +Main PID/thread [11492/1] enqueues packets to IPC queue with
> odp_queue_enq_multi(),
> +child process thread ring_thread() dequeues packets from ipc queue.
> +
> +
> +Example queue mode:
> +
> +./odp_fork -i eth0 -m 1 -c 1
> +waiting for packet...
> +Enqueue the packet to ipc queue size 98/1856
> +waiting for packet...
> +15917 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +Enqueue the packet to ipc queue size 98/1856
> +waiting for packet...
> +15917 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +Enqueue the packet to ipc queue size 98/1856
> +waiting for packet...
> +15917 no valid buffer
> + ring_thread() got buffer from IPC queue size 98/1856
> +Enqueue the packet to ipc queue size 98/1856
> +waiting for packet...
> +
> +Thread 15917 moves packets from ingress queue to IPC queue. Other process
> +in ring_thread() thread dequeues packets from IPC queue.
> diff --git a/example/ipc/odp_ipc.c b/example/ipc/odp_ipc.c
> new file mode 100644
> index 0000000..e10874e
> --- /dev/null
> +++ b/example/ipc/odp_ipc.c
> @@ -0,0 +1,685 @@
> +/* Copyright (c) 2014, Linaro Limited
> + * All rights reserved.
> + *
> + * SPDX-License-Identifier: BSD-3-Clause
> + */
> +
> +/**
> + * @file
> + *
> + * @example odp_ipc.c ODP IPC queues example application
> + */
> +
> +#include <stdlib.h>
> +#include <string.h>
> +#include <getopt.h>
> +#include <unistd.h>
> +
> +#include <odp.h>
> +#include <helper/odp_linux.h>
> +#include <helper/odp_packet_helper.h>
> +#include <helper/odp_eth.h>
> +#include <helper/odp_ip.h>
> +#include <helper/odp_ring.h>
> +
> +#define MAX_WORKERS 32
> +#define SHM_PKT_POOL_SIZE (512*2048)
> +#define SHM_PKT_POOL_BUF_SIZE 1856
> +#define MAX_PKT_BURST 16
> +
> +#define APPL_MODE_PKT_BURST 0
> +#define APPL_MODE_PKT_QUEUE 1
> +
> +#define RING_SIZE 4096
> +#define ODP_RING_NAMESIZE 32
> +
> +#define PRINT_APPL_MODE(x) printf("%s(%i)\n", #x, (x))
> +
> +/** Get rid of path in filename - only for unix-type paths using '/' */
> +#define NO_PATH(file_name) (strrchr((file_name), '/') ? \
> + strrchr((file_name), '/') + 1 : (file_name))
> +/**
> + * Parsed command line application arguments
> + */
> +typedef struct {
> + int core_count;
Miss doxygen comment
> + int if_count; /**< Number of interfaces to be used */
> + char **if_names; /**< Array of pointers to interface names */
> + int mode; /**< Packet IO mode */
> + int type; /**< Packet IO type */
> + int fanout; /**< Packet IO fanout */
describe more in the comments of the tree comments above
> + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */
> +} appl_args_t;
> +
> +/**
> + * Thread specific arguments
> + */
> +typedef struct {
> + char *pktio_dev; /**< Interface name to use */
> + odp_buffer_pool_t pool; /**< Buffer pool for packet IO */
> + int mode; /**< Thread mode */
> + int type; /**< Thread i/o type */
> + int fanout; /**< Thread i/o fanout */
describe more in the comments of the tree comments above
> + int tpid;
Miss doxygen comment
> +} thread_args_t;
> +
> +/**
> + * Grouping of both parsed CL args and thread specific args - alloc together
> + */
> +typedef struct {
> + /** Application (parsed) arguments */
> + appl_args_t appl;
> + /** Thread specific arguments */
Can't this two comments go at the end?
> + thread_args_t thread[MAX_WORKERS];
> +} args_t;
> +
> +/** Global pointer to args */
> +static args_t *args;
> +
> +/* helper funcs */
> +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len);
> +static void parse_args(int argc, char *argv[], appl_args_t *appl_args);
> +static void print_info(char *progname, appl_args_t *appl_args);
> +static void usage(char *progname);
> +
> +static void *ring_thread(void *arg)
> +{
> + thread_args_t *thr_args;
> + thr_args = arg;
> + int ret;
> + odp_buffer_t buf;
> + odp_buffer_pool_t pkt_pool;
> + odp_pktio_params_t pktio_ipc_params;
> + odp_pktio_t pktio_ipc;
> + odp_queue_t ipcq_def;
> +
> + printf("ODP RING THREAD PID %d\n" ,getpid());
> +
> + pkt_pool = odp_buffer_pool_lookup("packet_pool");
> + if (pkt_pool == ODP_BUFFER_POOL_INVALID) {
shouldn't we use unlikely here?
> + ODP_ERR("Error: pkt_pool not found\n");
> + return NULL;
> + }
> +
> + /* create shared queue between processes*/
> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
> + if (pktio_ipc == ODP_PKTIO_INVALID) {
shouldn't we use unlikely here?
> + ODP_ERR("Error: pktio create failed\n");
> + return NULL;
> + }
> +
> + if (thr_args->tpid) {
> + while (1) {
> + ipcq_def = odp_queue_lookup("shared-queue");
> + if (ipcq_def != ODP_QUEUE_INVALID) {
> + printf("%s() shared-queue found\n", __func__);
> + break;
> + }
> + sleep(1);
> + }
rewrite the above while like this to make it clearer?
ipcq_def = odp_queue_lookup("shared-queue");
while (ipcq_def != ODP_QUEUE_INVALID) {
sleep(1);
ipcq_def = odp_queue_lookup("shared-queue");
}
printf("%s() shared-queue found\n", __func__);
> +
> + ret = odp_pktio_inq_setdef(pktio_ipc, ipcq_def);
> + if (ret != 0) {
shouldn't we use unlikely here?
> + ODP_ERR("Error: slave thread default ipc-Q setup\n");
> + return NULL;
> + }
> +
> + /* In loop take packets from ipc queue and free this buffer */
> + while (1) {
> + buf = odp_queue_deq(ipcq_def);
> + if (!odp_buffer_is_valid(buf)) {
shouldn't we use unlikely here?
> + sleep(1);
> + printf("%d no valid buffer\n", getpid());
> + continue;
> + }
> +
> + //buf = odp_schedule(NULL, ODP_SCHED_WAIT);
Remove.
Cheers,
Anders
> +
> + printf("\t\t%s() got buffer from IPC queue size
> %ld/%ld\n", __func__,
> + (unsigned long)odp_packet_get_len(buf),
> + (unsigned long)odp_buffer_size(buf));
> + odp_buffer_free(buf);
> + }
> + }
> +
> + /* unreachable */
> + return NULL;
> +}
> +
> +
> +/**
> + * Packet IO loopback worker thread using ODP queues
> + *
> + * @param arg thread arguments of type 'thread_args_t *'
> + */
> +static void *pktio_queue_thread(void *arg)
> +{
> + int thr;
> + odp_buffer_pool_t pkt_pool;
> + odp_pktio_t pktio;
> + odp_pktio_t pktio_ipc;
> + thread_args_t *thr_args;
> + odp_queue_t inq_def;
> + odp_queue_t ipcq_def;
> + char inq_name[ODP_QUEUE_NAME_LEN];
> + odp_queue_param_t qparam;
> + odp_buffer_t buf;
> + int ret;
> + odp_pktio_params_t params;
> + odp_pktio_params_t pktio_ipc_params;
> + socket_params_t *sock_params = ¶ms.sock_params;
> +
> + thr_args = arg;
> +
> + thr = odp_thread_id();
> +
> + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
> + thr_args->pktio_dev);
> +
> + /* lookup ring from its name */
> + /* Lookup the packet pool */
> + pkt_pool = odp_buffer_pool_lookup("packet_pool");
> + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
> + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr);
> + return NULL;
> + }
> +
> + /* Open a packet IO instance for this thread */
> + sock_params->type = thr_args->type;
> + sock_params->fanout = thr_args->fanout;
> + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms);
> + if (pktio == ODP_PKTIO_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio create failed\n", thr);
> + return NULL;
> + }
> +
> + /*
> + * Create and set the default INPUT queue associated with the 'pktio'
> + * resource
> + */
> + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT;
> + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC;
> + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
> + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
> + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
> +
> + inq_def = odp_queue_create(inq_name, ODP_QUEUE_TYPE_PKTIN, &qparam);
> + if (inq_def == ODP_QUEUE_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr);
> + return NULL;
> + }
> +
> + ret = odp_pktio_inq_setdef(pktio, inq_def);
> + if (ret != 0) {
> + ODP_ERR(" [%02i] Error: default input-Q setup\n", thr);
> + return NULL;
> + }
> +
> + printf(" [%02i] created pktio:%02i, queue mode (ATOMIC queues)\n"
> + " default pktio%02i-INPUT queue:%u\n",
> + thr, pktio, pktio, inq_def);
> +
> + /* create shared queue between processes*/
> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
> + if (pktio_ipc == ODP_PKTIO_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio create failed\n", thr);
> + return NULL;
> + }
> + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC,
> &qparam);
> + if (ipcq_def == ODP_QUEUE_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr);
> + return NULL;
> + }
> +
> + /* In loop take packets from inq queue and put them to ipc queue */
> + for (;;) {
> + /* Use schedule to get buf from any input queue */
> + printf("waiting for packet...\n");
> + buf = odp_schedule(NULL, ODP_SCHED_WAIT);
> +
> + printf("Enqueue the packet to ipc queue size %ld/%ld\n",
> + (unsigned long)odp_packet_get_len(buf),
> + (unsigned long)odp_buffer_size(buf));
> +
> + odp_queue_enq(ipcq_def, buf);
> + }
> +
> +/* unreachable */
> +}
> +
> +/**
> + * Packet IO loopback worker thread using bursts from/to IO resources
> + *
> + * @param arg thread arguments of type 'thread_args_t *'
> + */
> +static void *pktio_ifburst_thread(void *arg)
> +{
> + int thr;
> + odp_buffer_pool_t pkt_pool;
> + odp_pktio_t pktio;
> + thread_args_t *thr_args;
> + int pkts, pkts_ok;
> + odp_packet_t pkt_tbl[MAX_PKT_BURST];
> + unsigned long pkt_cnt = 0;
> + unsigned long err_cnt = 0;
> + odp_pktio_params_t params;
> + socket_params_t *sock_params = ¶ms.sock_params;
> + int ret;
> +
> + odp_pktio_t pktio_ipc;
> + odp_queue_t ipcq_def;
> + char inq_name[ODP_QUEUE_NAME_LEN];
> + odp_queue_param_t qparam;
> + odp_pktio_params_t pktio_ipc_params;
> +
> + thr = odp_thread_id();
> + thr_args = arg;
> +
> + printf("Pktio thread [%02i] starts, pktio_dev:%s\n", thr,
> + thr_args->pktio_dev);
> +
> + /* Lookup the packet pool */
> + pkt_pool = odp_buffer_pool_lookup("packet_pool");
> + if (pkt_pool == ODP_BUFFER_POOL_INVALID || pkt_pool != thr_args->pool) {
> + ODP_ERR(" [%02i] Error: pkt_pool not found\n", thr);
> + return NULL;
> + }
> +
> + /* Open a packet IO instance for this thread */
> + sock_params->type = thr_args->type;
> + sock_params->fanout = thr_args->fanout;
> + pktio = odp_pktio_open(thr_args->pktio_dev, pkt_pool, ¶ms);
> + if (pktio == ODP_PKTIO_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio create failed.\n", thr);
> + return NULL;
> + }
> +
> + printf(" [%02i] created pktio:%02i, burst mode\n",
> + thr, pktio);
> +
> + pktio_ipc_params.type = ODP_PKTIO_TYPE_IPC;
> + pktio_ipc = odp_pktio_open(NULL, pkt_pool, &pktio_ipc_params);
> + if (pktio_ipc == ODP_PKTIO_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio create failed\n", thr);
> + return NULL;
> + }
> +
> + qparam.sched.prio = ODP_SCHED_PRIO_DEFAULT;
> + qparam.sched.sync = ODP_SCHED_SYNC_ATOMIC;
> + qparam.sched.group = ODP_SCHED_GROUP_DEFAULT;
> + snprintf(inq_name, sizeof(inq_name), "%i-pktio_inq_def", (int)pktio);
> + inq_name[ODP_QUEUE_NAME_LEN - 1] = '\0';
> +
> + ipcq_def = odp_queue_create("shared-queue", ODP_QUEUE_TYPE_IPC,
> &qparam);
> + if (ipcq_def == ODP_QUEUE_INVALID) {
> + ODP_ERR(" [%02i] Error: pktio queue creation failed\n", thr);
> + return NULL;
> + }
> +
> + /* Loop packets */
> + for (;;) {
> + pkts = odp_pktio_recv(pktio, pkt_tbl, MAX_PKT_BURST);
> + if (pkts > 0) {
> + /* Drop packets with errors */
> + pkts_ok = drop_err_pkts(pkt_tbl, pkts);
> + if (pkts_ok > 0) {
> + ret = odp_queue_enq_multi(ipcq_def, pkt_tbl,
> pkts_ok);
> + pkt_cnt += pkts_ok;
> + if (ret != 0) {
> + ODP_ERR("odp_ring_mp_enqueue_bulk
> fail\n");
> + } else {
> + printf("[%d/%d] enqueue %d packets,
> first buf %d size %ld/%ld, cnt %lu\n",
> + getpid(), thr, pkts_ok,
> + pkt_tbl[0],
> + (unsigned
> long)odp_packet_get_len(pkt_tbl[0]),
> + (unsigned
> long)odp_buffer_size(pkt_tbl[0]),
> + pkt_cnt);
> + }
> + }
> +
> + if (odp_unlikely(pkts_ok != pkts))
> + ODP_ERR("Dropped frames:%u - err_cnt:%lu\n",
> + pkts-pkts_ok, ++err_cnt);
> + }
> + }
> +
> +/* unreachable */
> +}
> +
> +/**
> + * ODP packet example main function
> + */
> +int main(int argc, char *argv[])
> +{
> + odp_linux_pthread_t thread_tbl[MAX_WORKERS];
> + odp_buffer_pool_t pool;
> + int thr_id;
> + int num_workers;
> + void *pool_base;
> + int i;
> + int first_core;
> + int core_count;
> +
> + /* Init ODP before calling anything else */
> + if (odp_init_global()) {
> + ODP_ERR("Error: ODP global init failed.\n");
> + exit(EXIT_FAILURE);
> + }
> +
> + args = malloc(sizeof(args_t));
> + if (args == NULL) {
> + ODP_ERR("Error: shared mem alloc failed.\n");
> + exit(EXIT_FAILURE);
> + }
> + memset(args, 0, sizeof(*args));
> +
> + /* Parse and store the application arguments */
> + parse_args(argc, argv, &args->appl);
> +
> + /* Print both system and application information */
> + print_info(NO_PATH(argv[0]), &args->appl);
> +
> + core_count = odp_sys_core_count();
> + num_workers = core_count;
> +
> + if (args->appl.core_count)
> + num_workers = args->appl.core_count;
> +
> + if (num_workers > MAX_WORKERS)
> + num_workers = MAX_WORKERS;
> +
> + printf("Num worker threads: %i\n", num_workers);
> +
> + /*
> + * By default core #0 runs Linux kernel background tasks.
> + * Start mapping thread from core #1
> + */
> + first_core = 1;
> +
> + if (core_count == 1)
> + first_core = 0;
> +
> + printf("First core: %i\n\n", first_core);
> +
> + /* Init this thread */
> + thr_id = odp_thread_create(0);
> + odp_init_local(thr_id);
> +
> + /* Create packet pool */
> + pool_base = odp_shm_reserve("shm_packet_pool",
> + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_PROC);
> + if (pool_base == NULL) {
> + ODP_ERR("Error: packet pool mem alloc failed.\n");
> + exit(EXIT_FAILURE);
> + }
> +
> + pool = odp_buffer_pool_create("packet_pool", pool_base,
> + SHM_PKT_POOL_SIZE,
> + SHM_PKT_POOL_BUF_SIZE,
> + ODP_CACHE_LINE_SIZE,
> + ODP_BUFFER_TYPE_PACKET);
> + if (pool == ODP_BUFFER_POOL_INVALID) {
> + ODP_ERR("Error: packet pool create failed.\n");
> + exit(EXIT_FAILURE);
> + }
> + odp_buffer_pool_print(pool);
> +
> +
> + /* Create another process */
> + int f = fork();
> +
> + /* Create and init worker threads */
> + memset(thread_tbl, 0, sizeof(thread_tbl));
> + for (i = 0; i < num_workers; ++i) {
> + void *(*thr_run_func) (void *);
> + int core;
> + int if_idx;
> +
> + core = (first_core + i) % core_count;
> +
> + if_idx = i % args->appl.if_count;
> +
> + args->thread[i].pktio_dev = args->appl.if_names[if_idx];
> + args->thread[i].pool = pool;
> + args->thread[i].mode = args->appl.mode;
> + args->thread[i].type = args->appl.type;
> + args->thread[i].fanout = args->appl.fanout;
> + args->thread[i].tpid = f;
> +
> + if (f) {
> + thr_run_func = ring_thread;
> + } else {
> + if (args->appl.mode == APPL_MODE_PKT_BURST)
> + thr_run_func = pktio_ifburst_thread;
> + else /* APPL_MODE_PKT_QUEUE */
> + thr_run_func = pktio_queue_thread;
> + }
> + /*
> + * Create threads one-by-one instead of all-at-once,
> + * because each thread might get different arguments.
> + * Calls odp_thread_create(cpu) for each thread
> + */
> + odp_linux_pthread_create(thread_tbl, 1, core, thr_run_func,
> + &args->thread[i]);
> + }
> +
> + /* Master thread waits for other threads to exit */
> + odp_linux_pthread_join(thread_tbl, num_workers);
> +
> + printf("Exit\n\n");
> +
> + return 0;
> +}
> +
> +/**
> + * Drop packets which input parsing marked as containing errors.
> + *
> + * Frees packets with error and modifies pkt_tbl[] to only contain packets
> with
> + * no detected errors.
> + *
> + * @param pkt_tbl Array of packet
> + * @param len Length of pkt_tbl[]
> + *
> + * @return Number of packets with no detected error
> + */
> +static int drop_err_pkts(odp_packet_t pkt_tbl[], unsigned len)
> +{
> + odp_packet_t pkt;
> + unsigned pkt_cnt = len;
> + unsigned i, j;
> +
> + for (i = 0, j = 0; i < len; ++i) {
> + pkt = pkt_tbl[i];
> +
> + if (odp_unlikely(odp_packet_error(pkt))) {
> + odp_packet_free(pkt); /* Drop */
> + pkt_cnt--;
> + } else if (odp_unlikely(i != j++)) {
> + pkt_tbl[j-1] = pkt;
> + }
> + }
> +
> + return pkt_cnt;
> +}
> +
> +/**
> + * Parse and store the command line arguments
> + *
> + * @param argc argument count
> + * @param argv[] argument vector
> + * @param appl_args Store application arguments here
> + */
> +static void parse_args(int argc, char *argv[], appl_args_t *appl_args)
> +{
> + int opt;
> + int long_index;
> + char *names, *str, *token, *save;
> + int i;
> + int len;
> + static struct option longopts[] = {
> + {"count", required_argument, NULL, 'c'},
> + {"interface", required_argument, NULL, 'i'}, /* return 'i' */
> + {"mode", required_argument, NULL, 'm'}, /* return 'm' */
> + {"help", no_argument, NULL, 'h'}, /* return 'h' */
> + {NULL, 0, NULL, 0}
> + };
> +
> + appl_args->mode = -1; /* Invalid, must be changed by parsing */
> + appl_args->type = 3; /* 3: ODP_PKTIO_TYPE_SOCKET_MMAP */
> + appl_args->fanout = 1; /* turn off fanout by default for mmap */
> +
> + while (1) {
> + opt = getopt_long(argc, argv, "+c:i:m:t:f:h",
> + longopts, &long_index);
> +
> + if (opt == -1)
> + break; /* No more options */
> +
> + switch (opt) {
> + case 'c':
> + appl_args->core_count = atoi(optarg);
> + break;
> + /* parse packet-io interface names */
> + case 'i':
> + len = strlen(optarg);
> + if (len == 0) {
> + usage(argv[0]);
> + exit(EXIT_FAILURE);
> + }
> + len += 1; /* add room for '\0' */
> +
> + names = malloc(len);
> + if (names == NULL) {
> + usage(argv[0]);
> + exit(EXIT_FAILURE);
> + }
> +
> + /* count the number of tokens separated by ',' */
> + strcpy(names, optarg);
> + for (str = names, i = 0;; str = NULL, i++) {
> + token = strtok_r(str, ",", &save);
> + if (token == NULL)
> + break;
> + }
> + appl_args->if_count = i;
> +
> + if (appl_args->if_count == 0) {
> + usage(argv[0]);
> + exit(EXIT_FAILURE);
> + }
> +
> + /* allocate storage for the if names */
> + appl_args->if_names =
> + calloc(appl_args->if_count, sizeof(char *));
> +
> + /* store the if names (reset names string) */
> + strcpy(names, optarg);
> + for (str = names, i = 0;; str = NULL, i++) {
> + token = strtok_r(str, ",", &save);
> + if (token == NULL)
> + break;
> + appl_args->if_names[i] = token;
> + }
> + break;
> +
> + case 'm':
> + i = atoi(optarg);
> + if (i == 0)
> + appl_args->mode = APPL_MODE_PKT_BURST;
> + else
> + appl_args->mode = APPL_MODE_PKT_QUEUE;
> + break;
> +
> + case 't':
> + appl_args->type = atoi(optarg);
> + break;
> +
> + case 'f':
> + appl_args->fanout = atoi(optarg);
> + break;
> +
> + case 'h':
> + usage(argv[0]);
> + exit(EXIT_SUCCESS);
> + break;
> +
> + default:
> + break;
> + }
> + }
> +
> + if (appl_args->if_count == 0 || appl_args->mode == -1) {
> + usage(argv[0]);
> + exit(EXIT_FAILURE);
> + }
> +
> + optind = 1; /* reset 'extern optind' from the getopt lib */
> +}
> +
> +/**
> + * Print system and application info
> + */
> +static void print_info(char *progname, appl_args_t *appl_args)
> +{
> + int i;
> +
> + printf("\n"
> + "ODP system info\n"
> + "---------------\n"
> + "ODP API version: %s\n"
> + "CPU model: %s\n"
> + "CPU freq (hz): %"PRIu64"\n"
> + "Cache line size: %i\n"
> + "Core count: %i\n"
> + "\n",
> + odp_version_api_str(), odp_sys_cpu_model_str(), odp_sys_cpu_hz(),
> + odp_sys_cache_line_size(), odp_sys_core_count());
> +
> + printf("Running ODP appl: \"%s\"\n"
> + "-----------------\n"
> + "IF-count: %i\n"
> + "Using IFs: ",
> + progname, appl_args->if_count);
> + for (i = 0; i < appl_args->if_count; ++i)
> + printf(" %s", appl_args->if_names[i]);
> + printf("\n"
> + "Mode: ");
> + if (appl_args->mode == APPL_MODE_PKT_BURST)
> + PRINT_APPL_MODE(APPL_MODE_PKT_BURST);
> + else
> + PRINT_APPL_MODE(APPL_MODE_PKT_QUEUE);
> + printf("\n\n");
> + fflush(NULL);
> +}
> +
> +/**
> + * Prinf usage information
> + */
> +static void usage(char *progname)
> +{
> + printf("\n"
> + "Usage: %s OPTIONS\n"
> + " E.g. %s -i eth1,eth2,eth3 -m 0\n"
> + "\n"
> + "OpenDataPlane example application.\n"
> + "\n"
> + "Mandatory OPTIONS:\n"
> + " -i, --interface Eth interfaces (comma-separated, no spaces)\n"
> + " -m, --mode 0: Burst send&receive packets (no queues)\n"
> + " 1: Send&receive packets through ODP queues.\n"
> + " -t, --type 1: ODP_PKTIO_TYPE_SOCKET_BASIC\n"
> + " 2: ODP_PKTIO_TYPE_SOCKET_MMSG\n"
> + " 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
> + " 4: ODP_PKTIO_TYPE_NETMAP\n"
> + " Default: 3: ODP_PKTIO_TYPE_SOCKET_MMAP\n"
> + " -f, --fanout 0: off 1: on (Default 1: on)\n"
> + "\n"
> + "Optional OPTIONS\n"
> + " -c, --count <number> Core count.\n"
> + " -h, --help Display help and exit.\n"
> + "\n", NO_PATH(progname), NO_PATH(progname)
> + );
> +}
> diff --git a/example/l2fwd/odp_l2fwd.c b/example/l2fwd/odp_l2fwd.c
> index f89ea7a..3a78761 100644
> --- a/example/l2fwd/odp_l2fwd.c
> +++ b/example/l2fwd/odp_l2fwd.c
> @@ -294,7 +294,8 @@ int main(int argc, char *argv[])
> }
>
> /* Reserve memory for args from shared mem */
> - gbl_args = odp_shm_reserve("shm_args", sizeof(args_t),
> ODP_CACHE_LINE_SIZE);
> + gbl_args = odp_shm_reserve("shm_args", sizeof(args_t),
> + ODP_CACHE_LINE_SIZE, ODP_SHM_THREAD);
> if (gbl_args == NULL) {
> ODP_ERR("Error: shared mem alloc failed.\n");
> exit(EXIT_FAILURE);
> @@ -345,7 +346,8 @@ int main(int argc, char *argv[])
>
> /* Create packet pool */
> pool_base = odp_shm_reserve("shm_packet_pool",
> - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
> if (pool_base == NULL) {
> ODP_ERR("Error: packet pool mem alloc failed.\n");
> exit(EXIT_FAILURE);
> diff --git a/example/odp_example/odp_example.c
> b/example/odp_example/odp_example.c
> index be96093..6d075b2 100644
> --- a/example/odp_example/odp_example.c
> +++ b/example/odp_example/odp_example.c
> @@ -987,7 +987,8 @@ int main(int argc, char *argv[])
> * Create message pool
> */
> pool_base = odp_shm_reserve("msg_pool",
> - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
>
> pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
> sizeof(test_message_t),
> diff --git a/example/packet/odp_pktio.c b/example/packet/odp_pktio.c
> index 247a28a..c7ff4ef 100644
> --- a/example/packet/odp_pktio.c
> +++ b/example/packet/odp_pktio.c
> @@ -291,7 +291,8 @@ int main(int argc, char *argv[])
> }
>
> /* Reserve memory for args from shared mem */
> - args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE);
> + args = odp_shm_reserve("shm_args", sizeof(args_t), ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
> if (args == NULL) {
> ODP_ERR("Error: shared mem alloc failed.\n");
> exit(EXIT_FAILURE);
> @@ -332,7 +333,8 @@ int main(int argc, char *argv[])
>
> /* Create packet pool */
> pool_base = odp_shm_reserve("shm_packet_pool",
> - SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + SHM_PKT_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
> if (pool_base == NULL) {
> ODP_ERR("Error: packet pool mem alloc failed.\n");
> exit(EXIT_FAILURE);
> diff --git a/example/timer/odp_timer_test.c b/example/timer/odp_timer_test.c
> index dbe0e5b..113200b 100644
> --- a/example/timer/odp_timer_test.c
> +++ b/example/timer/odp_timer_test.c
> @@ -260,7 +260,8 @@ int main(int argc, char *argv[])
> * Create message pool
> */
> pool_base = odp_shm_reserve("msg_pool",
> - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
>
> pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
> 0,
> diff --git a/include/helper/odp_ring.h b/include/helper/odp_ring.h
> index 0911531..60960a2 100644
> --- a/include/helper/odp_ring.h
> +++ b/include/helper/odp_ring.h
> @@ -158,6 +158,8 @@ typedef struct odp_ring {
>
> #define ODP_RING_F_SP_ENQ 0x0001 /* The default enqueue is
> "single-producer". */
> #define ODP_RING_F_SC_DEQ 0x0002 /* The default dequeue is
> "single-consumer". */
> +#define ODP_RING_SHM_PROC 0x0004 /* If set - ring is visible from different
> + processes. Default is thread visible. */
> #define ODP_RING_QUOT_EXCEED (1 << 31) /* Quota exceed for burst ops */
> #define ODP_RING_SZ_MASK (unsigned)(0x0fffffff) /* Ring size mask */
>
> diff --git a/include/odp_queue.h b/include/odp_queue.h
> index 5e083f1..4700a62 100644
> --- a/include/odp_queue.h
> +++ b/include/odp_queue.h
> @@ -44,6 +44,8 @@ typedef int odp_queue_type_t;
> #define ODP_QUEUE_TYPE_POLL 1 /**< Not scheduled queue */
> #define ODP_QUEUE_TYPE_PKTIN 2 /**< Packet input queue */
> #define ODP_QUEUE_TYPE_PKTOUT 3 /**< Packet output queue */
> +#define ODP_QUEUE_TYPE_IPC 4 /**< Packet ipc queue */
> +#define ODP_QUEUE_TYPE_IPC_LOOKUP 5 /**< Packet ipc queue */
>
> /**
> * ODP schedule priority
> diff --git a/include/odp_shared_memory.h b/include/odp_shared_memory.h
> index 8ac8847..1d8e8bb 100644
> --- a/include/odp_shared_memory.h
> +++ b/include/odp_shared_memory.h
> @@ -24,6 +24,13 @@ extern "C" {
> /** Maximum shared memory block name lenght in chars */
> #define ODP_SHM_NAME_LEN 32
>
> +typedef enum {
> + ODP_SHM_THREAD = 1, /**< Memory accessible by threads. */
> + ODP_SHM_PROC = 2, /**< Memory accessible by processes.
> + Will be created if not exist. */
> + ODP_SHM_PROC_NOCREAT = 3, /**< Memory accessible by processes.
> + Has to be created before usage.*/
> +} odp_shm_e;
>
> /**
> * Reserve a block of shared memory
> @@ -31,10 +38,12 @@ extern "C" {
> * @param name Name of the block (maximum ODP_SHM_NAME_LEN - 1 chars)
> * @param size Block size in bytes
> * @param align Block alignment in bytes
> + * @param flag Flags for shared memory creation
> *
> * @return Pointer to the reserved block, or NULL
> */
> -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align);
> +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
> + odp_shm_e flag);
>
> /**
> * Lookup for a block of shared memory
> @@ -44,6 +53,7 @@ void *odp_shm_reserve(const char *name, uint64_t size,
> uint64_t align);
> * @return Pointer to the block, or NULL
> */
> void *odp_shm_lookup(const char *name);
> +int odp_shm_lookup_ipc(const char *name);
>
>
> /**
> diff --git a/platform/linux-generic/include/api/odp_pktio_types.h
> b/platform/linux-generic/include/api/odp_pktio_types.h
> index 8d195a5..e8e27cc 100644
> --- a/platform/linux-generic/include/api/odp_pktio_types.h
> +++ b/platform/linux-generic/include/api/odp_pktio_types.h
> @@ -21,6 +21,7 @@ typedef enum {
> ODP_PKTIO_TYPE_SOCKET_MMSG,
> ODP_PKTIO_TYPE_SOCKET_MMAP,
> ODP_PKTIO_TYPE_NETMAP,
> + ODP_PKTIO_TYPE_IPC,
> } odp_pktio_type_t;
>
> #include <odp_pktio_socket.h>
> diff --git a/platform/linux-generic/include/odp_packet_io_internal.h
> b/platform/linux-generic/include/odp_packet_io_internal.h
> index 881cc5f..77fff96 100644
> --- a/platform/linux-generic/include/odp_packet_io_internal.h
> +++ b/platform/linux-generic/include/odp_packet_io_internal.h
> @@ -35,6 +35,7 @@ struct pktio_entry {
> #ifdef ODP_HAVE_NETMAP
> pkt_netmap_t pkt_nm; /**< using netmap API for IO */
> #endif
> + odp_buffer_pool_t pool; /**< reference to packet pool */
> };
>
> typedef union {
> diff --git a/platform/linux-generic/include/odp_queue_internal.h
> b/platform/linux-generic/include/odp_queue_internal.h
> index 8b6c517..f536331 100644
> --- a/platform/linux-generic/include/odp_queue_internal.h
> +++ b/platform/linux-generic/include/odp_queue_internal.h
> @@ -23,6 +23,7 @@ extern "C" {
> #include <odp_packet_io.h>
> #include <odp_align.h>
>
> +#include <helper/odp_ring.h>
>
> #define USE_TICKETLOCK
>
> @@ -39,6 +40,8 @@ extern "C" {
> #define QUEUE_STATUS_NOTSCHED 2
> #define QUEUE_STATUS_SCHED 3
>
> +#define QUEUE_IPC_ENTRIES 4096 /**< number of odp buffers in odp ring
> queue */
> +
> /* forward declaration */
> union queue_entry_u;
>
> @@ -65,13 +68,14 @@ struct queue_entry_s {
> deq_func_t dequeue;
> enq_multi_func_t enqueue_multi;
> deq_multi_func_t dequeue_multi;
> -
> odp_queue_t handle;
> odp_buffer_t sched_buf;
> odp_queue_type_t type;
> odp_queue_param_t param;
> odp_pktio_t pktin;
> odp_pktio_t pktout;
> + odp_ring_t *r; /* ring ref */
> + odp_buffer_t **r_p; /* ring memory */
> char name[ODP_QUEUE_NAME_LEN];
> };
>
> @@ -84,10 +88,16 @@ typedef union queue_entry_u {
> queue_entry_t *get_qentry(uint32_t queue_id);
>
> int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr);
> +
> odp_buffer_hdr_t *queue_deq(queue_entry_t *queue);
> +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue);
>
> int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num);
> +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> int num);
> +
> int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num);
> +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> int num);
>
> void queue_lock(queue_entry_t *queue);
> void queue_unlock(queue_entry_t *queue);
> diff --git a/platform/linux-generic/odp_buffer_pool.c
> b/platform/linux-generic/odp_buffer_pool.c
> index a48781a..9157994 100644
> --- a/platform/linux-generic/odp_buffer_pool.c
> +++ b/platform/linux-generic/odp_buffer_pool.c
> @@ -102,7 +102,8 @@ int odp_buffer_pool_init_global(void)
>
> pool_tbl = odp_shm_reserve("odp_buffer_pools",
> sizeof(pool_table_t),
> - sizeof(pool_entry_t));
> + sizeof(pool_entry_t),
> + ODP_SHM_THREAD);
>
> if (pool_tbl == NULL)
> return -1;
> diff --git a/platform/linux-generic/odp_packet_io.c
> b/platform/linux-generic/odp_packet_io.c
> index 33ade10..6672d6b 100644
> --- a/platform/linux-generic/odp_packet_io.c
> +++ b/platform/linux-generic/odp_packet_io.c
> @@ -55,7 +55,8 @@ int odp_pktio_init_global(void)
>
> pktio_tbl = odp_shm_reserve("odp_pktio_entries",
> sizeof(pktio_table_t),
> - sizeof(pktio_entry_t));
> + sizeof(pktio_entry_t),
> + ODP_SHM_THREAD);
> if (pktio_tbl == NULL)
> return -1;
>
> @@ -129,6 +130,8 @@ static void init_pktio_entry(pktio_entry_t *entry,
> odp_pktio_params_t *params)
> memset(&entry->s.pkt_nm, 0, sizeof(entry->s.pkt_nm));
> break;
> #endif
> + case ODP_PKTIO_TYPE_IPC:
> + break;
> default:
> ODP_ERR("Packet I/O type not supported. Please recompile\n");
> break;
> @@ -194,6 +197,8 @@ odp_pktio_t odp_pktio_open(const char *dev,
> odp_buffer_pool_t pool,
> ODP_DBG("Allocating netmap pktio\n");
> break;
> #endif
> + case ODP_PKTIO_TYPE_IPC:
> + break;
> default:
> ODP_ERR("Invalid pktio type: %02x\n", params->type);
> return ODP_PKTIO_INVALID;
> @@ -239,6 +244,9 @@ odp_pktio_t odp_pktio_open(const char *dev,
> odp_buffer_pool_t pool,
> }
> break;
> #endif
> + case ODP_PKTIO_TYPE_IPC:
> + pktio_entry->s.pool = pool;
> + break;
> default:
> free_pktio_entry(id);
> id = ODP_PKTIO_INVALID;
> @@ -381,11 +389,23 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t
> queue)
> pktio_entry_t *pktio_entry = get_entry(id);
> queue_entry_t *qentry = queue_to_qentry(queue);
>
> - if (pktio_entry == NULL || qentry == NULL)
> + if (pktio_entry == NULL || qentry == NULL) {
> + ODP_ERR("%s() return -q reason %p -- %p\n",
> + __func__,
> + pktio_entry, qentry);
> return -1;
> + }
>
> - if (qentry->s.type != ODP_QUEUE_TYPE_PKTIN)
> + switch (qentry->s.type)
> + {
> + case ODP_QUEUE_TYPE_PKTIN:
> + case ODP_QUEUE_TYPE_IPC:
> + case ODP_QUEUE_TYPE_IPC_LOOKUP:
> + break;
> + default:
> + ODP_ERR("%s() type is %d\n", __func__, qentry->s.type);
> return -1;
> + }
>
> lock_entry(pktio_entry);
> pktio_entry->s.inq_default = queue;
> @@ -396,6 +416,12 @@ int odp_pktio_inq_setdef(odp_pktio_t id, odp_queue_t
> queue)
> qentry->s.status = QUEUE_STATUS_SCHED;
> queue_unlock(qentry);
>
> + if (qentry->s.type == ODP_QUEUE_TYPE_IPC)
> + return 0;
> + if (qentry->s.type == ODP_QUEUE_TYPE_IPC_LOOKUP)
> + return 0;
> +
> +
> odp_schedule_queue(queue, qentry->s.param.sched.prio);
>
> return 0;
> diff --git a/platform/linux-generic/odp_queue.c
> b/platform/linux-generic/odp_queue.c
> index c637bdf..d08e72e 100644
> --- a/platform/linux-generic/odp_queue.c
> +++ b/platform/linux-generic/odp_queue.c
> @@ -21,6 +21,10 @@
> #include <odp_hints.h>
> #include <odp_sync.h>
>
> +#include <helper/odp_ring.h>
> +#include <sys/types.h>
> +#include <unistd.h>
> +
> #ifdef USE_TICKETLOCK
> #include <odp_ticketlock.h>
> #define LOCK(a) odp_ticketlock_lock(a)
> @@ -34,7 +38,7 @@
> #endif
>
> #include <string.h>
> -
> +#include <stdlib.h>
>
> typedef struct queue_table_t {
> queue_entry_t queue[ODP_CONFIG_QUEUES];
> @@ -77,6 +81,41 @@ static void queue_init(queue_entry_t *queue, const char
> *name,
> queue->s.enqueue_multi = pktout_enq_multi;
> queue->s.dequeue_multi = pktout_deq_multi;
> break;
> + case ODP_QUEUE_TYPE_IPC:
> + queue->s.r = odp_ring_lookup(name);
> + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES *
> + sizeof(odp_buffer_t));
> + if (!queue->s.r)
> + {
> + queue->s.r = odp_ring_create(name, QUEUE_IPC_ENTRIES,
> ODP_RING_SHM_PROC);
> + if (queue->s.r == NULL) {
> + ODP_ERR("ring create failed\n");
> + }
> + }
> + queue->s.enqueue = queue_enq_ipc;
> + queue->s.dequeue = queue_deq_ipc;
> + queue->s.enqueue_multi = queue_enq_multi_ipc;
> + queue->s.dequeue_multi = queue_deq_multi_ipc;
> + break;
> + case ODP_QUEUE_TYPE_IPC_LOOKUP:
> + queue->s.r_p = (odp_buffer_t **)malloc(QUEUE_IPC_ENTRIES *
> + sizeof(odp_buffer_t));
> + if (odp_shm_lookup_ipc(name) == 1)
> + {
> + size_t ring_size = QUEUE_IPC_ENTRIES * sizeof(void *)
> + + sizeof(odp_ring_t);
> + queue->s.r = odp_shm_reserve(name, ring_size,
> + ODP_CACHE_LINE_SIZE,
> + ODP_SHM_PROC_NOCREAT);
> + if (queue->s.r == NULL) {
> + ODP_ERR("LOOKUP ring create failed\n");
> + }
> + }
> + queue->s.enqueue = queue_enq_ipc;
> + queue->s.dequeue = queue_deq_ipc;
> + queue->s.enqueue_multi = queue_enq_multi_ipc;
> + queue->s.dequeue_multi = queue_deq_multi_ipc;
> + break;
> default:
> queue->s.enqueue = queue_enq;
> queue->s.dequeue = queue_deq;
> @@ -99,7 +138,8 @@ int odp_queue_init_global(void)
>
> queue_tbl = odp_shm_reserve("odp_queues",
> sizeof(queue_table_t),
> - sizeof(queue_entry_t));
> + sizeof(queue_entry_t),
> + ODP_SHM_THREAD);
>
> if (queue_tbl == NULL)
> return -1;
> @@ -113,6 +153,11 @@ int odp_queue_init_global(void)
> queue->s.handle = queue_from_id(i);
> }
>
> + /* for linux-generic IPC queue implemented totaly in
> + * software using odp_ring.
> + */
> + odp_ring_tailq_init();
> +
> ODP_DBG("done\n");
> ODP_DBG("Queue init global\n");
> ODP_DBG(" struct queue_entry_s size %zu\n",
> @@ -243,6 +288,27 @@ odp_queue_t odp_queue_lookup(const char *name)
> UNLOCK(&queue->s.lock);
> }
>
> + /* do look up for shared memory object if exist return that queue*/
> + odp_ring_t *r;
> +
> + r = odp_ring_lookup(name);
> + if (r == NULL) {
> + if ( odp_shm_lookup_ipc(name) == 1) {
> + /* Create local IPC queue connected to shm object */
> + odp_queue_t q = odp_queue_create(name,
> + ODP_QUEUE_TYPE_IPC_LOOKUP,
> NULL);
> + if (q != ODP_QUEUE_INVALID) {
> + return q;
> + }
> + }
> + } else {
> + /* odp ring is in odp_ring_list. That means current process
> already created
> + * link with such name. That might be ipc queue or ring itself.
> For now
> + * print error here
> + */
> + ODP_ERR("odp ring with name: \"%s\" already initialized\n",
> name);
> + }
> +
> return ODP_QUEUE_INVALID;
> }
>
> @@ -276,6 +342,38 @@ int queue_enq(queue_entry_t *queue, odp_buffer_hdr_t
> *buf_hdr)
> return 0;
> }
>
> +int queue_enq_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr)
> +{
> + int ret;
> + odp_ring_t *r = queue->s.r;
> + odp_buffer_bits_t handle;
> + uint32_t index = buf_hdr->handle.index;
> + uint32_t pool_id = buf_hdr->handle.pool;
> + odp_buffer_t buf;
> + void **rbuf_p;
> +
> + /* get buffer from buf_hdr */
> + handle.index = index;
> + handle.pool = pool_id;
> +
> + buf = handle.u32;
> +
> + rbuf_p = (void*)&buf;
> + /* use odp_ring locks instead of per process queue lock
> + * LOCK(&queue->s.lock);
> + */
> + /* queue buffer to the ring. Note: we can't use pointer to buf_hdr
> + * here due to poiter will be referenced in different porocess
> + */
> + ret = odp_ring_mp_enqueue_bulk(r, rbuf_p, 1);
> + if (ret != 0)
> + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
> + /*
> + * UNLOCK(&queue->s.lock);
> + */
> + return 0;
> +}
> +
>
> int queue_enq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num)
> {
> @@ -311,6 +409,43 @@ int queue_enq_multi(queue_entry_t *queue,
> odp_buffer_hdr_t *buf_hdr[], int num)
> return 0;
> }
>
> +int queue_enq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> int num)
> +{
> + int i;
> + int ret = 0;
> + odp_ring_t *r = queue->s.r;
> + odp_buffer_bits_t handle;
> + odp_buffer_t buf;
> + void **rbuf_p;
> +
> + /* use odp_ring locks instead of per process queue lock
> + * LOCK(&queue->s.lock);
> + */
> +
> + /* odp_buffer_t buffers can be in not continius memory,
> + * so queue them to IPC ring one by one.
> + */
> + for (i = 0; i < num; i++) {
> + handle.index = buf_hdr[i]->handle.index;
> + handle.pool = buf_hdr[i]->handle.pool;
> +
> + buf = handle.u32;
> +
> + rbuf_p = (void*)&buf;
> +
> + /* queue buffer to the ring. Note: we can't use pointer to
> buf_hdr
> + * here due to poiter will be referenced in different porocess
> + */
> + ret += odp_ring_mp_enqueue_bulk(r, rbuf_p, 1);
> + if (ret != 0)
> + ODP_ERR("odp_ring_mp_enqueue_bulk fail\n");
> + }
> + /*
> + * UNLOCK(&queue->s.lock);
> + */
> +
> + return ret;
> +}
>
> int odp_queue_enq_multi(odp_queue_t handle, odp_buffer_t buf[], int num)
> {
> @@ -369,6 +504,72 @@ odp_buffer_hdr_t *queue_deq(queue_entry_t *queue)
> return buf_hdr;
> }
>
> +odp_buffer_hdr_t *queue_deq_ipc(queue_entry_t *queue)
> +{
> + odp_buffer_hdr_t *buf_hdr = NULL;
> + odp_ring_t *r = queue->s.r;
> + int ret;
> + odp_buffer_t buf;
> +
> + /* using odp_ring lock
> + * LOCK(&queue->s.lock);
> + */
> + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, 1);
> + if (ret == 0) {
> + memcpy(&buf, (void *)queue->s.r_p,
> + sizeof(odp_buffer_t));
> + buf_hdr = odp_buf_to_hdr(buf);
> + }
> + /*
> + * UNLOCK(&queue->s.lock);
> + */
> +
> + return buf_hdr;
> +}
> +
> +int queue_deq_multi_ipc(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[],
> int num)
> +{
> + int i = 0;
> + odp_ring_t *r = queue->s.r;
> + int ret;
> + odp_buffer_t buf;
> +
> + /* use odp ring lock
> + * LOCK(&queue->s.lock);
> + */
> +
> + if (queue->s.head == NULL) {
> + /* Already empty queue */
> + } else {
> + odp_buffer_hdr_t *hdr = queue->s.head;
> +
> + ret = odp_ring_mc_dequeue_bulk(r, (void **)queue->s.r_p, num);
> + if (ret == 0) {
> + for (; i < num && hdr; i++) {
> + memcpy(&buf, (void *)queue->s.r_p[i],
> + sizeof(odp_buffer_t));
> +
> + buf_hdr[i] = odp_buf_to_hdr(buf);
> + hdr = hdr->next;
> + buf_hdr[i]->next = NULL;
> + }
> +
> + }
> +
> + queue->s.head = hdr;
> +
> + if (hdr == NULL) {
> + /* Queue is now empty */
> + queue->s.tail = NULL;
> + }
> + }
> +
> + /* use odp_ring lock
> + * UNLOCK(&queue->s.lock);
> + */
> +
> + return i;
> +}
>
> int queue_deq_multi(queue_entry_t *queue, odp_buffer_hdr_t *buf_hdr[], int
> num)
> {
> diff --git a/platform/linux-generic/odp_ring.c
> b/platform/linux-generic/odp_ring.c
> index 25ff66a..40df789 100644
> --- a/platform/linux-generic/odp_ring.c
> +++ b/platform/linux-generic/odp_ring.c
> @@ -82,6 +82,9 @@
> #include <odp_rwlock.h>
> #include <helper/odp_ring.h>
>
> +#include <sys/types.h>
> +#include <unistd.h>
> +
> static TAILQ_HEAD(, odp_ring) odp_ring_list;
>
> /*
> @@ -158,6 +161,12 @@ odp_ring_create(const char *name, unsigned count,
> unsigned flags)
> char ring_name[ODP_RING_NAMESIZE];
> odp_ring_t *r;
> size_t ring_size;
> + odp_shm_e shm_flag;
> +
> + if (flags & ODP_RING_SHM_PROC)
> + shm_flag = ODP_SHM_PROC;
> + else
> + shm_flag = ODP_SHM_THREAD;
>
> /* count must be a power of 2 */
> if (!ODP_VAL_IS_POWER_2(count) || (count > ODP_RING_SZ_MASK)) {
> @@ -171,7 +180,8 @@ odp_ring_create(const char *name, unsigned count,
> unsigned flags)
>
> odp_rwlock_write_lock(&qlock);
> /* reserve a memory zone for this ring.*/
> - r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE);
> + r = odp_shm_reserve(ring_name, ring_size, ODP_CACHE_LINE_SIZE,
> + shm_flag);
>
> if (r != NULL) {
> /* init the ring structure */
> @@ -545,12 +555,14 @@ void odp_ring_list_dump(void)
> /* search a ring from its name */
> odp_ring_t *odp_ring_lookup(const char *name)
> {
> - odp_ring_t *r = odp_shm_lookup(name);
> + odp_ring_t *r;
>
> odp_rwlock_read_lock(&qlock);
> TAILQ_FOREACH(r, &odp_ring_list, next) {
> - if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0)
> - break;
> + if (strncmp(name, r->name, ODP_RING_NAMESIZE) == 0) {
> + odp_rwlock_read_unlock(&qlock);
> + return r;
> + }
> }
> odp_rwlock_read_unlock(&qlock);
>
> diff --git a/platform/linux-generic/odp_schedule.c
> b/platform/linux-generic/odp_schedule.c
> index 9e399f1..9e76692 100644
> --- a/platform/linux-generic/odp_schedule.c
> +++ b/platform/linux-generic/odp_schedule.c
> @@ -89,7 +89,8 @@ int odp_schedule_init_global(void)
>
> sched = odp_shm_reserve("odp_scheduler",
> sizeof(sched_t),
> - ODP_CACHE_LINE_SIZE);
> + ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
>
> if (sched == NULL) {
> ODP_ERR("Schedule init: Shm reserve failed.\n");
> @@ -98,7 +99,8 @@ int odp_schedule_init_global(void)
>
>
> pool_base = odp_shm_reserve("odp_sched_pool",
> - SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + SCHED_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
>
> pool = odp_buffer_pool_create("odp_sched_pool", pool_base,
> SCHED_POOL_SIZE, sizeof(queue_desc_t),
> diff --git a/platform/linux-generic/odp_shared_memory.c
> b/platform/linux-generic/odp_shared_memory.c
> index 784f42b..ae53bb6 100644
> --- a/platform/linux-generic/odp_shared_memory.c
> +++ b/platform/linux-generic/odp_shared_memory.c
> @@ -14,10 +14,14 @@
> #include <sys/mman.h>
> #include <asm/mman.h>
> #include <fcntl.h>
> +#include <unistd.h>
> +#include <sys/types.h>
>
> #include <stdio.h>
> #include <string.h>
>
> +#include <helper/odp_ring.h>
> +#include <stdlib.h>
>
> #define ODP_SHM_NUM_BLOCKS 32
>
> @@ -59,9 +63,8 @@ int odp_shm_init_global(void)
> ODP_DBG("NOTE: mmap does not support huge pages\n");
> #endif
>
> - addr = mmap(NULL, sizeof(odp_shm_table_t),
> - PROT_READ | PROT_WRITE, SHM_FLAGS, -1, 0);
> -
> + /* malloc instead of mmap to bind table to process. */
> + addr = malloc(sizeof(odp_shm_table_t));
> if (addr == MAP_FAILED)
> return -1;
>
> @@ -95,9 +98,12 @@ static int find_block(const char *name)
> }
>
>
> -void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align)
> +void *odp_shm_reserve(const char *name, uint64_t size, uint64_t align,
> + odp_shm_e flag)
> {
> - int i;
> + int i, ret, shm_open_flags;
> + int shm = -1;
> + int mmap_flags = MAP_SHARED;
> odp_shm_block_t *block;
> void *addr;
> #ifdef MAP_HUGETLB
> @@ -107,8 +113,20 @@ void *odp_shm_reserve(const char *name, uint64_t size,
> uint64_t align)
> page_sz = odp_sys_page_size();
> #endif
>
> + printf("pid %d: %s() %s size %lld, flag %d\n",
> + getpid(), __func__, name, (unsigned long long)size, flag);
> +
> odp_spinlock_lock(&odp_shm_tbl->lock);
>
> + /* if object was already created return it's address */
> + if (flag == ODP_SHM_PROC_NOCREAT) {
> + for (i = 0; i < ODP_SHM_NUM_BLOCKS; i++) {
> + if (strcmp(name, odp_shm_tbl->block[i].name) == 0) {
> + return odp_shm_tbl->block[i].addr;
> + }
> + }
> + }
> +
> if (find_block(name) >= 0) {
> /* Found a block with the same name */
> odp_spinlock_unlock(&odp_shm_tbl->lock);
> @@ -123,8 +141,8 @@ void *odp_shm_reserve(const char *name, uint64_t size,
> uint64_t align)
> }
>
> if (i > ODP_SHM_NUM_BLOCKS - 1) {
> - /* Table full */
> odp_spinlock_unlock(&odp_shm_tbl->lock);
> + ODP_ERR("ODP_SHM_NUM_BLOCKS table is full");
> return NULL;
> }
>
> @@ -133,19 +151,42 @@ void *odp_shm_reserve(const char *name, uint64_t size,
> uint64_t align)
> addr = MAP_FAILED;
> block->huge = 0;
>
> + if (flag != ODP_SHM_THREAD) {
> + shm_open_flags = O_RDWR;
> + if (flag == ODP_SHM_PROC)
> + shm_open_flags |= O_CREAT;
> +
> + shm = shm_open(name, shm_open_flags, S_IRUSR | S_IWUSR);
> + if (shm == -1) {
> + odp_spinlock_unlock(&odp_shm_tbl->lock);
> + ODP_ERR("shm_open failed");
> + return NULL;
> + }
> +
> + ret = ftruncate(shm, size + align);
> + if (ret == -1) {
> + odp_spinlock_unlock(&odp_shm_tbl->lock);
> + if (flag != ODP_SHM_PROC_NOCREAT)
> + shm_unlink(name);
> + ODP_ERR("ftruncate failed");
> + return NULL;
> + }
> + } else {
> + mmap_flags |= MAP_ANONYMOUS;
> + }
> +
> #ifdef MAP_HUGETLB
> /* Try first huge pages */
> if (huge_sz && (size + align) > page_sz) {
> addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE,
> - SHM_FLAGS | MAP_HUGETLB, -1, 0);
> + mmap_flags | MAP_HUGETLB, shm, 0);
> }
> #endif
>
> /* Use normal pages for small or failed huge page allocations */
> if (addr == MAP_FAILED) {
> addr = mmap(NULL, size + align, PROT_READ | PROT_WRITE,
> - SHM_FLAGS, -1, 0);
> -
> + mmap_flags, shm, 0);
> } else {
> block->huge = 1;
> }
> @@ -153,6 +194,9 @@ void *odp_shm_reserve(const char *name, uint64_t size,
> uint64_t align)
> if (addr == MAP_FAILED) {
> /* Alloc failed */
> odp_spinlock_unlock(&odp_shm_tbl->lock);
> + if (flag != ODP_SHM_PROC_NOCREAT)
> + shm_unlink(name);
> + ODP_ERR("MAP_FAILED\n");
> return NULL;
> }
>
> @@ -192,6 +236,17 @@ void *odp_shm_lookup(const char *name)
> return addr;
> }
>
> +int odp_shm_lookup_ipc(const char *name)
> +{
> + int shm;
> +
> + shm = shm_open(name, O_RDWR, S_IRUSR | S_IWUSR);
> + if (shm == -1)
> + return 0;
> +
> + close(shm);
> + return 1;
> +}
>
> void odp_shm_print_all(void)
> {
> diff --git a/test/api_test/odp_shm_test.c b/test/api_test/odp_shm_test.c
> index 318d662..fc448a4 100644
> --- a/test/api_test/odp_shm_test.c
> +++ b/test/api_test/odp_shm_test.c
> @@ -47,7 +47,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
> odp_print_system_info();
>
> test_shared_data = odp_shm_reserve("test_shared_data",
> - sizeof(test_shared_data_t), 128);
> + sizeof(test_shared_data_t), 128,
> + ODP_SHM_THREAD);
> memset(test_shared_data, 0, sizeof(test_shared_data_t));
> printf("test shared data at %p\n\n", test_shared_data);
>
> diff --git a/test/api_test/odp_timer_ping.c b/test/api_test/odp_timer_ping.c
> index c1cc255..88f830f 100644
> --- a/test/api_test/odp_timer_ping.c
> +++ b/test/api_test/odp_timer_ping.c
> @@ -328,7 +328,8 @@ int main(int argc ODP_UNUSED, char *argv[] ODP_UNUSED)
> * Create message pool
> */
> pool_base = odp_shm_reserve("msg_pool",
> - MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE);
> + MSG_POOL_SIZE, ODP_CACHE_LINE_SIZE,
> + ODP_SHM_THREAD);
>
> pool = odp_buffer_pool_create("msg_pool", pool_base, MSG_POOL_SIZE,
> BUF_SIZE,
> --
> 1.8.5.1.163.gd7aced9
>
>
> _______________________________________________
> lng-odp mailing list
> [email protected]
> http://lists.linaro.org/mailman/listinfo/lng-odp
--
Anders Roxell
[email protected]
M: +46 709 71 42 85 | IRC: roxell
_______________________________________________
lng-odp mailing list
[email protected]
http://lists.linaro.org/mailman/listinfo/lng-odp