The idea behind this patch looks good, however it really should be split
into at least two patches since the changes to pktio to exploit the change
are really independent of the base functional change.

I did notice, however that test/performance/odp_scheduling shows a number
of issues, both functional and performance that indicate further work is
needed.

Prior to applying this patch odp_scheduling gives these sort of numbers:

 [2] alloc_sng alloc+free       47 CPU cycles
  [3] alloc_sng alloc+free       50 CPU cycles
  [1] alloc_sng alloc+free       52 CPU cycles
  [4] alloc_sng alloc+free       52 CPU cycles
  [4] alloc_multi alloc+free     95 CPU cycles
  [1] alloc_multi alloc+free     99 CPU cycles
  [3] alloc_multi alloc+free    100 CPU cycles
  [2] alloc_multi alloc+free    100 CPU cycles
  [3] plain_queue enq+deq      3210 CPU cycles
  [1] plain_queue enq+deq      3210 CPU cycles
  [2] plain_queue enq+deq      3210 CPU cycles
  [4] plain_queue enq+deq      3210 CPU cycles
  [3] sched_____s_lo enq+deq   1835 CPU cycles
  [1] sched_____s_lo enq+deq   1889 CPU cycles
  [2] sched_____s_lo enq+deq   1705 CPU cycles
  [4] sched_____s_lo enq+deq   1866 CPU cycles
  [3] sched_____m_lo enq+deq    474 CPU cycles
  [2] sched_____m_lo enq+deq    483 CPU cycles
  [4] sched_____m_lo enq+deq    480 CPU cycles
  [1] sched_____m_lo enq+deq    481 CPU cycles
  [4] sched_multi_lo enq+deq    461 CPU cycles
  [2] sched_multi_lo enq+deq    462 CPU cycles
  [3] sched_multi_lo enq+deq    454 CPU cycles
  [1] sched_multi_lo enq+deq    462 CPU cycles
  [3] sched_____s_hi enq+deq   1306 CPU cycles
  [1] sched_____s_hi enq+deq    887 CPU cycles
  [4] sched_____s_hi enq+deq   1045 CPU cycles
  [2] sched_____s_hi enq+deq   1338 CPU cycles
  [2] sched_____m_hi enq+deq    175 CPU cycles
  [3] sched_____m_hi enq+deq    170 CPU cycles
  [4] sched_____m_hi enq+deq    166 CPU cycles
  [1] sched_____m_hi enq+deq    170 CPU cycles
  [2] sched_multi_hi enq+deq    134 CPU cycles
  [4] sched_multi_hi enq+deq    136 CPU cycles
  [1] sched_multi_hi enq+deq    135 CPU cycles
  [3] sched_multi_hi enq+deq    133 CPU cycles
Thread 3 exits
Thread 1 exits
Thread 4 exits
Thread 2 exits
ODP example complete

With this patch applied, however, I see the following:

  [4] alloc_sng alloc+free       79 CPU cycles
  [1] alloc_sng alloc+free       82 CPU cycles
  [2] alloc_sng alloc+free       83 CPU cycles
  [3] alloc_sng alloc+free       83 CPU cycles
  [1] alloc_multi alloc+free    142 CPU cycles
  [4] alloc_multi alloc+free    146 CPU cycles
  [2] alloc_multi alloc+free    146 CPU cycles
  [3] alloc_multi alloc+free    148 CPU cycles
  [1] plain_queue enq+deq      3283 CPU cycles
  [4] plain_queue enq+deq      3283 CPU cycles
  [3] plain_queue enq+deq      3283 CPU cycles
  [2] plain_queue enq+deq      3284 CPU cycles
  [4] sched_____s_lo enq+deq   1908 CPU cycles
  [3] sched_____s_lo enq+deq   1895 CPU cycles
  [2] sched_____s_lo enq+deq   1758 CPU cycles
  [1] sched_____s_lo enq+deq   1929 CPU cycles
  [1] sched_____m_lo enq+deq    494 CPU cycles
  [3] sched_____m_lo enq+deq    495 CPU cycles
  [4] sched_____m_lo enq+deq    494 CPU cycles
  [2] sched_____m_lo enq+deq    497 CPU cycles
  [4] sched_multi_lo enq+deq    457 CPU cycles
  [1] sched_multi_lo enq+deq    456 CPU cycles
  [3] sched_multi_lo enq+deq    446 CPU cycles
  [2] sched_multi_lo enq+deq    456 CPU cycles
  [2] sched_____s_hi enq+deq   1321 CPU cycles
  [1] sched_____s_hi enq+deq    869 CPU cycles
  [4] sched_____s_hi enq+deq   1058 CPU cycles
  [3] sched_____s_hi enq+deq   1280 CPU cycles
  [2] sched_____m_hi enq+deq    183 CPU cycles
  [3] sched_____m_hi enq+deq    171 CPU cycles
  [4] sched_____m_hi enq+deq    183 CPU cycles
  [1] sched_____m_hi enq+deq    178 CPU cycles
  [2] sched_multi_hi enq+deq    142 CPU cycles
  [4] sched_multi_hi enq+deq    144 CPU cycles
  [3] sched_multi_hi enq+deq    135 CPU cycles
  [1] sched_multi_hi enq+deq    139 CPU cycles
Thread 1 exits
Thread 2 exits
Thread 4 exits
Thread 3 exits
ODP example complete

odp_queue.c:328:odp_queue_destroy():queue "sched_00_30" not empty
odp_queue.c:328:odp_queue_destroy():queue "sched_00_58" not empty
odp_schedule.c:271:schedule_term_global():Queue not empty
odp_schedule.c:271:schedule_term_global():Queue not empty
odp_schedule.c:294:schedule_term_global():Pool destroy fail.
odp_init.c:188:_odp_term_global():ODP schedule term failed.
odp_queue.c:170:odp_queue_term_global():Not destroyed queue: sched_00_30
odp_queue.c:170:odp_queue_term_global():Not destroyed queue: sched_00_58
odp_init.c:195:_odp_term_global():ODP queue term failed.
odp_pool.c:149:odp_pool_term_global():Not destroyed pool: odp_sched_pool
odp_pool.c:149:odp_pool_term_global():Not destroyed pool: msg_pool
odp_init.c:202:_odp_term_global():ODP buffer pool term failed.

---

While some of these numbers are improved, many are worse. However the test
also fails to terminate properly, which suggests that the local caches are
probably not being flushed properly or completely.

On Fri, Jul 8, 2016 at 7:08 AM, Petri Savolainen <[email protected]
> wrote:

> Optimize local buffer cache performance which is critical to
> many use cases - including packet IO. For example, l2fwd test
> application packet throughput is increased about 10% (with dpdk
> pktio).
>
> Main parts of the optimization are:
>  * Local cache implemented as an array of buf_hdr pointers,
>    instead of a linked list (which causes a lot of cache misses)
>  * Alloc and free N buffers per operation
>  * Modify dpdk pktio to take advantage of multi-alloc/free.
>    Others pktios do alloc/free still one packet at a time.
>
> All above steps are needed to demontrate the performance upgrade.
> Some related pool functions (get_buf(), ret_buf(), etc) were moved
> from pool header to c source file, since those were actual local
> to the c source file. Also some unused pool variables are removed
> also.
>
> Signed-off-by: Petri Savolainen <[email protected]>
> ---
>  .../linux-generic/include/odp_buffer_inlines.h     |  26 +-
>  .../linux-generic/include/odp_buffer_internal.h    |   5 +-
>  platform/linux-generic/include/odp_internal.h      |   2 -
>  .../linux-generic/include/odp_packet_internal.h    |   4 +-
>  platform/linux-generic/include/odp_pool_internal.h | 143 +-------
>  platform/linux-generic/odp_buffer.c                |   3 -
>  platform/linux-generic/odp_packet.c                |  70 ++--
>  platform/linux-generic/odp_pool.c                  | 400
> +++++++++++++++++----
>  platform/linux-generic/pktio/dpdk.c                |  24 +-
>  platform/linux-generic/pktio/netmap.c              |   5 +-
>  platform/linux-generic/pktio/pcap.c                |  26 +-
>  platform/linux-generic/pktio/socket.c              |  16 +-
>  platform/linux-generic/pktio/socket_mmap.c         |   7 +-
>  platform/linux-generic/pktio/tap.c                 |   7 +-
>  14 files changed, 434 insertions(+), 304 deletions(-)
>
> diff --git a/platform/linux-generic/include/odp_buffer_inlines.h
> b/platform/linux-generic/include/odp_buffer_inlines.h
> index 3f4d9fd..2b1ab42 100644
> --- a/platform/linux-generic/include/odp_buffer_inlines.h
> +++ b/platform/linux-generic/include/odp_buffer_inlines.h
> @@ -56,30 +56,12 @@ static inline odp_buffer_hdr_t
> *odp_buf_to_hdr(odp_buffer_t buf)
>                 (pool->pool_mdata_addr + (index * ODP_CACHE_LINE_SIZE));
>  }
>
> -static inline uint32_t odp_buffer_refcount(odp_buffer_hdr_t *buf)
> +static inline uint32_t pool_id_from_buf(odp_buffer_t buf)
>  {
> -       return odp_atomic_load_u32(&buf->ref_count);
> -}
> -
> -static inline uint32_t odp_buffer_incr_refcount(odp_buffer_hdr_t *buf,
> -                                               uint32_t val)
> -{
> -       return odp_atomic_fetch_add_u32(&buf->ref_count, val) + val;
> -}
> -
> -static inline uint32_t odp_buffer_decr_refcount(odp_buffer_hdr_t *buf,
> -                                               uint32_t val)
> -{
> -       uint32_t tmp;
> -
> -       tmp = odp_atomic_fetch_sub_u32(&buf->ref_count, val);
> +       odp_buffer_bits_t handle;
>
> -       if (tmp < val) {
> -               odp_atomic_fetch_add_u32(&buf->ref_count, val - tmp);
> -               return 0;
> -       } else {
> -               return tmp - val;
> -       }
> +       handle.handle = buf;
> +       return handle.pool_id;
>  }
>
>  static inline odp_buffer_hdr_t *validate_buf(odp_buffer_t buf)
> diff --git a/platform/linux-generic/include/odp_buffer_internal.h
> b/platform/linux-generic/include/odp_buffer_internal.h
> index f21364c..07d3e8d 100644
> --- a/platform/linux-generic/include/odp_buffer_internal.h
> +++ b/platform/linux-generic/include/odp_buffer_internal.h
> @@ -114,7 +114,6 @@ struct odp_buffer_hdr_t {
>         union {
>                 uint32_t all;
>                 struct {
> -                       uint32_t zeroized:1; /* Zeroize buf data on free */
>                         uint32_t hdrdata:1;  /* Data is in buffer hdr */
>                         uint32_t sustain:1;  /* Sustain order */
>                 };
> @@ -123,7 +122,6 @@ struct odp_buffer_hdr_t {
>         int8_t                   type;       /* buffer type */
>         odp_event_type_t         event_type; /* for reuse as event */
>         uint32_t                 size;       /* max data size */
> -       odp_atomic_u32_t         ref_count;  /* reference count */
>         odp_pool_t               pool_hdl;   /* buffer pool handle */
>         union {
>                 uint64_t         buf_u64;    /* user u64 */
> @@ -171,9 +169,10 @@ typedef struct {
>  #define ODP_FREEBUF -1
>
>  /* Forward declarations */
> -odp_buffer_t buffer_alloc(odp_pool_t pool, size_t size);
>  int buffer_alloc_multi(odp_pool_t pool_hdl, size_t size,
>                        odp_buffer_t buf[], int num);
> +void buffer_free_multi(uint32_t pool_id,
> +                      const odp_buffer_t buf[], int num_free);
>  int seg_alloc_head(odp_buffer_hdr_t *buf_hdr, int segcount);
>  void seg_free_head(odp_buffer_hdr_t *buf_hdr, int segcount);
>  int seg_alloc_tail(odp_buffer_hdr_t *buf_hdr, int segcount);
> diff --git a/platform/linux-generic/include/odp_internal.h
> b/platform/linux-generic/include/odp_internal.h
> index d12f850..8bad450 100644
> --- a/platform/linux-generic/include/odp_internal.h
> +++ b/platform/linux-generic/include/odp_internal.h
> @@ -119,8 +119,6 @@ int odp_tm_term_global(void);
>  int _odp_int_name_tbl_init_global(void);
>  int _odp_int_name_tbl_term_global(void);
>
> -void _odp_flush_caches(void);
> -
>  int cpuinfo_parser(FILE *file, system_info_t *sysinfo);
>  uint64_t odp_cpu_hz_current(int id);
>
> diff --git a/platform/linux-generic/include/odp_packet_internal.h
> b/platform/linux-generic/include/odp_packet_internal.h
> index 4c4e36c..392d670 100644
> --- a/platform/linux-generic/include/odp_packet_internal.h
> +++ b/platform/linux-generic/include/odp_packet_internal.h
> @@ -306,7 +306,9 @@ static inline int
> packet_parse_not_complete(odp_packet_hdr_t *pkt_hdr)
>  /* Forward declarations */
>  int _odp_packet_copy_md_to_packet(odp_packet_t srcpkt, odp_packet_t
> dstpkt);
>
> -odp_packet_t packet_alloc(odp_pool_t pool_hdl, uint32_t len, int parse);
> +/* Packet alloc of pktios */
> +int packet_alloc_multi(odp_pool_t pool_hdl, uint32_t len,
> +                      odp_packet_t pkt[], int max_num);
>
>  /* Fill in parser metadata for L2 */
>  void packet_parse_l2(packet_parser_t *prs, uint32_t frame_len);
> diff --git a/platform/linux-generic/include/odp_pool_internal.h
> b/platform/linux-generic/include/odp_pool_internal.h
> index 3317bd0..d6717ff 100644
> --- a/platform/linux-generic/include/odp_pool_internal.h
> +++ b/platform/linux-generic/include/odp_pool_internal.h
> @@ -51,15 +51,25 @@ typedef struct _odp_buffer_pool_init_t {
>         void *buf_init_arg;        /**< Argument to be passed to
> buf_init() */
>  } _odp_buffer_pool_init_t;         /**< Type of buffer initialization
> struct */
>
> +#define POOL_MAX_LOCAL_CHUNKS 4
> +#define POOL_CHUNK_SIZE       32
> +#define POOL_MAX_LOCAL_BUFS   (POOL_MAX_LOCAL_CHUNKS * POOL_CHUNK_SIZE)
> +
> +struct local_cache_s {
> +       uint64_t bufallocs;  /* Local buffer alloc count */
> +       uint64_t buffrees;   /* Local buffer free count */
> +
> +       uint32_t num_buf;
> +       odp_buffer_hdr_t *buf[POOL_MAX_LOCAL_BUFS];
> +};
> +
>  /* Local cache for buffer alloc/free acceleration */
>  typedef struct local_cache_t {
>         union {
> -               struct {
> -                       odp_buffer_hdr_t *buf_freelist;  /* The local
> cache */
> -                       uint64_t bufallocs;  /* Local buffer alloc count */
> -                       uint64_t buffrees;   /* Local buffer free count */
> -               };
> -               uint8_t pad[ODP_CACHE_LINE_SIZE_ROUNDUP(sizeof(uint64_t))];
> +               struct local_cache_s s;
> +
> +               uint8_t pad[ODP_CACHE_LINE_SIZE_ROUNDUP(
> +                           sizeof(struct local_cache_s))];
>         };
>  } local_cache_t;
>
> @@ -214,127 +224,6 @@ static inline void ret_blk(struct pool_entry_s
> *pool, void *block)
>         odp_atomic_inc_u64(&pool->poolstats.blkfrees);
>  }
>
> -static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
> -{
> -       odp_buffer_hdr_t *myhead;
> -       POOL_LOCK(&pool->buf_lock);
> -
> -       myhead = pool->buf_freelist;
> -
> -       if (odp_unlikely(myhead == NULL)) {
> -               POOL_UNLOCK(&pool->buf_lock);
> -               odp_atomic_inc_u64(&pool->poolstats.bufempty);
> -       } else {
> -               pool->buf_freelist = myhead->next;
> -               POOL_UNLOCK(&pool->buf_lock);
> -               uint64_t bufcount =
> -                       odp_atomic_fetch_sub_u32(&pool->bufcount, 1) - 1;
> -
> -               /* Check for low watermark condition */
> -               if (bufcount == pool->buf_low_wm &&
> !pool->buf_low_wm_assert) {
> -                       pool->buf_low_wm_assert = 1;
> -
>  odp_atomic_inc_u64(&pool->poolstats.buf_low_wm_count);
> -               }
> -
> -               odp_atomic_inc_u64(&pool->poolstats.bufallocs);
> -       }
> -
> -       return (void *)myhead;
> -}
> -
> -static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t
> *buf)
> -{
> -       if (!buf->flags.hdrdata && buf->type != ODP_EVENT_BUFFER) {
> -               while (buf->segcount > 0) {
> -                       if (buffer_is_secure(buf) || pool_is_secure(pool))
> -                               memset(buf->addr[buf->segcount - 1],
> -                                      0, buf->segsize);
> -                       ret_blk(pool, buf->addr[--buf->segcount]);
> -               }
> -               buf->size = 0;
> -       }
> -
> -       buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
> -       POOL_LOCK(&pool->buf_lock);
> -       buf->next = pool->buf_freelist;
> -       pool->buf_freelist = buf;
> -       POOL_UNLOCK(&pool->buf_lock);
> -
> -       uint64_t bufcount = odp_atomic_fetch_add_u32(&pool->bufcount, 1) +
> 1;
> -
> -       /* Check if low watermark condition should be deasserted */
> -       if (bufcount == pool->buf_high_wm && pool->buf_low_wm_assert) {
> -               pool->buf_low_wm_assert = 0;
> -               odp_atomic_inc_u64(&pool->poolstats.buf_high_wm_count);
> -       }
> -
> -       odp_atomic_inc_u64(&pool->poolstats.buffrees);
> -}
> -
> -static inline void *get_local_buf(local_cache_t *buf_cache,
> -                                 struct pool_entry_s *pool,
> -                                 size_t totsize)
> -{
> -       odp_buffer_hdr_t *buf = buf_cache->buf_freelist;
> -
> -       if (odp_likely(buf != NULL)) {
> -               buf_cache->buf_freelist = buf->next;
> -
> -               if (odp_unlikely(buf->size < totsize)) {
> -                       intmax_t needed = totsize - buf->size;
> -
> -                       do {
> -                               void *blk = get_blk(pool);
> -                               if (odp_unlikely(blk == NULL)) {
> -                                       ret_buf(pool, buf);
> -                                       buf_cache->buffrees--;
> -                                       return NULL;
> -                               }
> -                               buf->addr[buf->segcount++] = blk;
> -                               needed -= pool->seg_size;
> -                       } while (needed > 0);
> -
> -                       buf->size = buf->segcount * pool->seg_size;
> -               }
> -
> -               buf_cache->bufallocs++;
> -       }
> -
> -       return buf;
> -}
> -
> -static inline void ret_local_buf(local_cache_t *buf_cache,
> -                               odp_buffer_hdr_t *buf)
> -{
> -       buf->allocator = ODP_FREEBUF;
> -       buf->next = buf_cache->buf_freelist;
> -       buf_cache->buf_freelist = buf;
> -
> -       buf_cache->buffrees++;
> -}
> -
> -static inline void flush_cache(local_cache_t *buf_cache,
> -                              struct pool_entry_s *pool)
> -{
> -       odp_buffer_hdr_t *buf = buf_cache->buf_freelist;
> -       uint32_t flush_count = 0;
> -
> -       while (buf != NULL) {
> -               odp_buffer_hdr_t *next = buf->next;
> -               ret_buf(pool, buf);
> -               buf = next;
> -               flush_count++;
> -       }
> -
> -       odp_atomic_add_u64(&pool->poolstats.bufallocs,
> buf_cache->bufallocs);
> -       odp_atomic_add_u64(&pool->poolstats.buffrees,
> -                          buf_cache->buffrees - flush_count);
> -
> -       buf_cache->buf_freelist = NULL;
> -       buf_cache->bufallocs = 0;
> -       buf_cache->buffrees = 0;
> -}
> -
>  static inline odp_pool_t pool_index_to_handle(uint32_t pool_id)
>  {
>         return _odp_cast_scalar(odp_pool_t, pool_id);
> diff --git a/platform/linux-generic/odp_buffer.c
> b/platform/linux-generic/odp_buffer.c
> index e7e4d58..ce2fdba 100644
> --- a/platform/linux-generic/odp_buffer.c
> +++ b/platform/linux-generic/odp_buffer.c
> @@ -67,9 +67,6 @@ int odp_buffer_snprint(char *str, uint32_t n,
> odp_buffer_t buf)
>         len += snprintf(&str[len], n-len,
>                         "  size         %" PRIu32 "\n",        hdr->size);
>         len += snprintf(&str[len], n-len,
> -                       "  ref_count    %" PRIu32 "\n",
> -                       odp_atomic_load_u32(&hdr->ref_count));
> -       len += snprintf(&str[len], n-len,
>                         "  type         %i\n",        hdr->type);
>
>         return len;
> diff --git a/platform/linux-generic/odp_packet.c
> b/platform/linux-generic/odp_packet.c
> index 0e319d2..dfb6f56 100644
> --- a/platform/linux-generic/odp_packet.c
> +++ b/platform/linux-generic/odp_packet.c
> @@ -76,35 +76,48 @@ static void packet_init(pool_entry_t *pool,
> odp_packet_hdr_t *pkt_hdr,
>         pkt_hdr->input = ODP_PKTIO_INVALID;
>  }
>
> -odp_packet_t packet_alloc(odp_pool_t pool_hdl, uint32_t len, int parse)
> +int packet_alloc_multi(odp_pool_t pool_hdl, uint32_t len,
> +                      odp_packet_t pkt[], int max_num)
>  {
> -       odp_packet_t pkt;
>         odp_packet_hdr_t *pkt_hdr;
>         pool_entry_t *pool = odp_pool_to_entry(pool_hdl);
> +       int num, i;
>
> -       if (pool->s.params.type != ODP_POOL_PACKET)
> -               return ODP_PACKET_INVALID;
> -
> -       /* Handle special case for zero-length packets */
> -       if (len == 0) {
> -               len = pool->s.params.buf.size;
> +       num = buffer_alloc_multi(pool_hdl, len, (odp_buffer_t *)pkt,
> max_num);
>
> -               pkt = (odp_packet_t)buffer_alloc(pool_hdl, len);
> +       for (i = 0; i < num; i++) {
> +               pkt_hdr = odp_packet_hdr(pkt[i]);
> +               packet_init(pool, pkt_hdr, len, 1 /* do parse */);
>
> -               if (pkt == ODP_PACKET_INVALID)
> -                       return ODP_PACKET_INVALID;
> +               if (pkt_hdr->tailroom >= pkt_hdr->buf_hdr.segsize)
> +                       pull_tail_seg(pkt_hdr);
> +       }
>
> -               pull_tail(odp_packet_hdr(pkt), len);
> +       return num;
> +}
>
> -       } else {
> -               pkt = (odp_packet_t)buffer_alloc(pool_hdl, len);
> +odp_packet_t odp_packet_alloc(odp_pool_t pool_hdl, uint32_t len)
> +{
> +       pool_entry_t *pool = odp_pool_to_entry(pool_hdl);
> +       size_t pkt_size = len ? len : pool->s.params.buf.size;
> +       int count;
> +       odp_packet_t pkt;
> +       odp_packet_hdr_t *pkt_hdr;
>
> -               if (pkt == ODP_PACKET_INVALID)
> -                       return ODP_PACKET_INVALID;
> +       if (pool->s.params.type != ODP_POOL_PACKET) {
> +               __odp_errno = EINVAL;
> +               return ODP_PACKET_INVALID;
>         }
>
> +       count = buffer_alloc_multi(pool_hdl, pkt_size, (odp_buffer_t
> *)&pkt, 1);
> +
> +       if (count != 1)
> +               return ODP_PACKET_INVALID;
> +
>         pkt_hdr = odp_packet_hdr(pkt);
> -       packet_init(pool, pkt_hdr, len, parse);
> +       packet_init(pool, pkt_hdr, pkt_size, 0 /* do not parse */);
> +       if (len == 0)
> +               pull_tail(pkt_hdr, pkt_size);
>
>         if (pkt_hdr->tailroom >= pkt_hdr->buf_hdr.segsize)
>                 pull_tail_seg(pkt_hdr);
> @@ -112,11 +125,6 @@ odp_packet_t packet_alloc(odp_pool_t pool_hdl,
> uint32_t len, int parse)
>         return pkt;
>  }
>
> -odp_packet_t odp_packet_alloc(odp_pool_t pool_hdl, uint32_t len)
> -{
> -       return packet_alloc(pool_hdl, len, 0);
> -}
> -
>  int odp_packet_alloc_multi(odp_pool_t pool_hdl, uint32_t len,
>                            odp_packet_t pkt[], int num)
>  {
> @@ -135,9 +143,12 @@ int odp_packet_alloc_multi(odp_pool_t pool_hdl,
> uint32_t len,
>         for (i = 0; i < count; ++i) {
>                 odp_packet_hdr_t *pkt_hdr = odp_packet_hdr(pkt[i]);
>
> -               packet_init(pool, pkt_hdr, pkt_size, 0);
> +               packet_init(pool, pkt_hdr, pkt_size, 0 /* do not parse */);
>                 if (len == 0)
>                         pull_tail(pkt_hdr, pkt_size);
> +
> +               if (pkt_hdr->tailroom >= pkt_hdr->buf_hdr.segsize)
> +                       pull_tail_seg(pkt_hdr);
>         }
>
>         return count;
> @@ -145,12 +156,16 @@ int odp_packet_alloc_multi(odp_pool_t pool_hdl,
> uint32_t len,
>
>  void odp_packet_free(odp_packet_t pkt)
>  {
> -       odp_buffer_free((odp_buffer_t)pkt);
> +       uint32_t pool_id = pool_id_from_buf((odp_buffer_t)pkt);
> +
> +       buffer_free_multi(pool_id, (odp_buffer_t *)&pkt, 1);
>  }
>
>  void odp_packet_free_multi(const odp_packet_t pkt[], int num)
>  {
> -       odp_buffer_free_multi((const odp_buffer_t *)pkt, num);
> +       uint32_t pool_id = pool_id_from_buf((odp_buffer_t)pkt[0]);
> +
> +       buffer_free_multi(pool_id, (const odp_buffer_t * const)pkt, num);
>  }
>
>  int odp_packet_reset(odp_packet_t pkt, uint32_t len)
> @@ -972,10 +987,7 @@ int _odp_packet_copy_md_to_packet(odp_packet_t
> srcpkt, odp_packet_t dstpkt)
>                        srchdr->buf_hdr.uarea_size ?
>                        dsthdr->buf_hdr.uarea_size :
>                        srchdr->buf_hdr.uarea_size);
> -       odp_atomic_store_u32(
> -               &dsthdr->buf_hdr.ref_count,
> -               odp_atomic_load_u32(
> -                       &srchdr->buf_hdr.ref_count));
> +
>         copy_packet_parser_metadata(srchdr, dsthdr);
>
>         /* Metadata copied, but return indication of whether the packet
> diff --git a/platform/linux-generic/odp_pool.c
> b/platform/linux-generic/odp_pool.c
> index ec6d86a..5e4b6fb 100644
> --- a/platform/linux-generic/odp_pool.c
> +++ b/platform/linux-generic/odp_pool.c
> @@ -57,8 +57,15 @@ static const char SHM_DEFAULT_NAME[] =
> "odp_buffer_pools";
>  /* Pool entry pointers (for inlining) */
>  void *pool_entry_ptr[ODP_CONFIG_POOLS];
>
> -/* Cache thread id locally for local cache performance */
> -static __thread int local_id;
> +/* Thread local variables */
> +typedef struct pool_local_t {
> +       local_cache_t *cache[ODP_CONFIG_POOLS];
> +       int thr_id;
> +} pool_local_t;
> +
> +static __thread pool_local_t local;
> +
> +static void flush_cache(local_cache_t *buf_cache, struct pool_entry_s
> *pool);
>
>  int odp_pool_init_global(void)
>  {
> @@ -111,7 +118,19 @@ int odp_pool_init_global(void)
>
>  int odp_pool_init_local(void)
>  {
> -       local_id = odp_thread_id();
> +       pool_entry_t *pool;
> +       int i;
> +       int thr_id = odp_thread_id();
> +
> +       memset(&local, 0, sizeof(pool_local_t));
> +
> +       for (i = 0; i < ODP_CONFIG_POOLS; i++) {
> +               pool           = get_pool_entry(i);
> +               local.cache[i] = &pool->s.local_cache[thr_id];
> +               local.cache[i]->s.num_buf = 0;
> +       }
> +
> +       local.thr_id = thr_id;
>         return 0;
>  }
>
> @@ -144,7 +163,14 @@ int odp_pool_term_global(void)
>
>  int odp_pool_term_local(void)
>  {
> -       _odp_flush_caches();
> +       int i;
> +
> +       for (i = 0; i < ODP_CONFIG_POOLS; i++) {
> +               pool_entry_t *pool = get_pool_entry(i);
> +
> +               flush_cache(local.cache[i], &pool->s);
> +       }
> +
>         return 0;
>  }
>
> @@ -179,10 +205,53 @@ int odp_pool_capability(odp_pool_capability_t *capa)
>         return 0;
>  }
>
> -/**
> +static inline odp_buffer_hdr_t *get_buf(struct pool_entry_s *pool)
> +{
> +       odp_buffer_hdr_t *myhead;
> +
> +       POOL_LOCK(&pool->buf_lock);
> +
> +       myhead = pool->buf_freelist;
> +
> +       if (odp_unlikely(myhead == NULL)) {
> +               POOL_UNLOCK(&pool->buf_lock);
> +               odp_atomic_inc_u64(&pool->poolstats.bufempty);
> +       } else {
> +               pool->buf_freelist = myhead->next;
> +               POOL_UNLOCK(&pool->buf_lock);
> +
> +               odp_atomic_fetch_sub_u32(&pool->bufcount, 1);
> +               odp_atomic_inc_u64(&pool->poolstats.bufallocs);
> +       }
> +
> +       return (void *)myhead;
> +}
> +
> +static inline void ret_buf(struct pool_entry_s *pool, odp_buffer_hdr_t
> *buf)
> +{
> +       if (!buf->flags.hdrdata && buf->type != ODP_EVENT_BUFFER) {
> +               while (buf->segcount > 0) {
> +                       if (buffer_is_secure(buf) || pool_is_secure(pool))
> +                               memset(buf->addr[buf->segcount - 1],
> +                                      0, buf->segsize);
> +                       ret_blk(pool, buf->addr[--buf->segcount]);
> +               }
> +               buf->size = 0;
> +       }
> +
> +       buf->allocator = ODP_FREEBUF;  /* Mark buffer free */
> +       POOL_LOCK(&pool->buf_lock);
> +       buf->next = pool->buf_freelist;
> +       pool->buf_freelist = buf;
> +       POOL_UNLOCK(&pool->buf_lock);
> +
> +       odp_atomic_fetch_add_u32(&pool->bufcount, 1);
> +       odp_atomic_inc_u64(&pool->poolstats.buffrees);
> +}
> +
> +/*
>   * Pool creation
>   */
> -
>  odp_pool_t _pool_create(const char *name,
>                         odp_pool_param_t *params,
>                         uint32_t shmflags)
> @@ -208,9 +277,6 @@ odp_pool_t _pool_create(const char *name,
>         /* Restriction for v1.0: All non-packet buffers are unsegmented */
>         int unseg = 1;
>
> -       /* Restriction for v1.0: No zeroization support */
> -       const int zeroized = 0;
> -
>         uint32_t blk_size, buf_stride, buf_num, blk_num, seg_len = 0;
>         uint32_t buf_align =
>                 params->type == ODP_POOL_BUFFER ? params->buf.align : 0;
> @@ -350,7 +416,6 @@ odp_pool_t _pool_create(const char *name,
>                 POOL_UNLOCK(&pool->s.lock);
>
>                 pool->s.flags.unsegmented = unseg;
> -               pool->s.flags.zeroized = zeroized;
>                 pool->s.seg_size = unseg ? blk_size : seg_len;
>                 pool->s.blk_size = blk_size;
>
> @@ -383,9 +448,7 @@ odp_pool_t _pool_create(const char *name,
>                         /* Iniitalize buffer metadata */
>                         tmp->allocator = ODP_FREEBUF;
>                         tmp->flags.all = 0;
> -                       tmp->flags.zeroized = zeroized;
>                         tmp->size = 0;
> -                       odp_atomic_init_u32(&tmp->ref_count, 0);
>                         tmp->type = params->type;
>                         tmp->event_type = params->type;
>                         tmp->pool_hdl = pool->s.pool_hdl;
> @@ -502,6 +565,41 @@ int odp_pool_info(odp_pool_t pool_hdl,
> odp_pool_info_t *info)
>         return 0;
>  }
>
> +static inline void get_local_cache_bufs(local_cache_t *buf_cache,
> uint32_t idx,
> +                                       odp_buffer_hdr_t *buf_hdr[],
> +                                       uint32_t num)
> +{
> +       uint32_t i;
> +
> +       for (i = 0; i < num; i++) {
> +               buf_hdr[i] = buf_cache->s.buf[idx + i];
> +               odp_prefetch(buf_hdr[i]);
> +               odp_prefetch_store(buf_hdr[i]);
> +       }
> +}
> +
> +static void flush_cache(local_cache_t *buf_cache, struct pool_entry_s
> *pool)
> +{
> +       uint32_t flush_count = 0;
> +       uint32_t num;
> +
> +       while ((num = buf_cache->s.num_buf)) {
> +               odp_buffer_hdr_t *buf;
> +
> +               buf = buf_cache->s.buf[num - 1];
> +               ret_buf(pool, buf);
> +               flush_count++;
> +               buf_cache->s.num_buf--;
> +       }
> +
> +       odp_atomic_add_u64(&pool->poolstats.bufallocs,
> buf_cache->s.bufallocs);
> +       odp_atomic_add_u64(&pool->poolstats.buffrees,
> +                          buf_cache->s.buffrees - flush_count);
> +
> +       buf_cache->s.bufallocs = 0;
> +       buf_cache->s.buffrees = 0;
> +}
> +
>  int odp_pool_destroy(odp_pool_t pool_hdl)
>  {
>         uint32_t pool_id = pool_handle_to_index(pool_hdl);
> @@ -620,77 +718,157 @@ void seg_free_tail(odp_buffer_hdr_t *buf_hdr, int
> segcount)
>         buf_hdr->size      = buf_hdr->segcount * pool->s.seg_size;
>  }
>
> -odp_buffer_t buffer_alloc(odp_pool_t pool_hdl, size_t size)
> +static inline int get_local_bufs(local_cache_t *buf_cache,
> +                                odp_buffer_hdr_t *buf_hdr[], uint32_t
> max_num)
> +{
> +       uint32_t num_buf = buf_cache->s.num_buf;
> +       uint32_t num = num_buf;
> +
> +       if (odp_unlikely(num_buf == 0))
> +               return 0;
> +
> +       if (odp_likely(max_num < num))
> +               num = max_num;
> +
> +       get_local_cache_bufs(buf_cache, num_buf - num, buf_hdr, num);
> +       buf_cache->s.num_buf   -= num;
> +       buf_cache->s.bufallocs += num;
> +
> +       return num;
> +}
> +
> +static inline void ret_local_buf(local_cache_t *buf_cache, uint32_t idx,
> +                                odp_buffer_hdr_t *buf)
> +{
> +       buf_cache->s.buf[idx] = buf;
> +       buf_cache->s.num_buf++;
> +       buf_cache->s.buffrees++;
> +}
> +
> +static inline void ret_local_bufs(local_cache_t *buf_cache, uint32_t idx,
> +                                 odp_buffer_hdr_t *buf[], int num_buf)
> +{
> +       int i;
> +
> +       for (i = 0; i < num_buf; i++)
> +               buf_cache->s.buf[idx + i] = buf[i];
> +
> +       buf_cache->s.num_buf  += num_buf;
> +       buf_cache->s.buffrees += num_buf;
> +}
> +
> +int buffer_alloc_multi(odp_pool_t pool_hdl, size_t size,
> +                      odp_buffer_t buf[], int max_num)
>  {
>         uint32_t pool_id = pool_handle_to_index(pool_hdl);
>         pool_entry_t *pool = get_pool_entry(pool_id);
>         uintmax_t totsize = pool->s.headroom + size + pool->s.tailroom;
> -       odp_anybuf_t *buf;
> +       odp_buffer_hdr_t *buf_tbl[max_num];
> +       odp_buffer_hdr_t *buf_hdr;
> +       int num, i;
> +       intmax_t needed;
> +       void *blk;
>
>         /* Reject oversized allocation requests */
>         if ((pool->s.flags.unsegmented && totsize > pool->s.seg_size) ||
>             (!pool->s.flags.unsegmented &&
>              totsize > pool->s.seg_size * ODP_BUFFER_MAX_SEG))
> -               return ODP_BUFFER_INVALID;
> +               return 0;
>
>         /* Try to satisfy request from the local cache */
> -       buf = (odp_anybuf_t *)
> -               (void *)get_local_buf(&pool->s.local_cache[local_id],
> -                                     &pool->s, totsize);
> +       num = get_local_bufs(local.cache[pool_id], buf_tbl, max_num);
>
>         /* If cache is empty, satisfy request from the pool */
> -       if (odp_unlikely(buf == NULL)) {
> -               buf = (odp_anybuf_t *)(void *)get_buf(&pool->s);
> +       if (odp_unlikely(num < max_num)) {
> +               for (; num < max_num; num++) {
> +                       buf_hdr = get_buf(&pool->s);
> +
> +                       if (odp_unlikely(buf_hdr == NULL))
> +                               goto pool_empty;
> +
> +                       /* Get blocks for this buffer, if pool uses
> +                        * application data */
> +                       if (buf_hdr->size < totsize) {
> +                               uint32_t segcount;
> +
> +                               needed = totsize - buf_hdr->size;
> +                               do {
> +                                       blk = get_blk(&pool->s);
> +                                       if (odp_unlikely(blk == NULL)) {
> +                                               ret_buf(&pool->s, buf_hdr);
> +                                               goto pool_empty;
> +                                       }
> +
> +                                       segcount = buf_hdr->segcount++;
> +                                       buf_hdr->addr[segcount] = blk;
> +                                       needed -= pool->s.seg_size;
> +                               } while (needed > 0);
> +                               buf_hdr->size = buf_hdr->segcount *
> +                                               pool->s.seg_size;
> +                       }
> +
> +                       buf_tbl[num] = buf_hdr;
> +               }
> +       }
>
> -               if (odp_unlikely(buf == NULL))
> -                       return ODP_BUFFER_INVALID;
> +pool_empty:
> +       for (i = 0; i < num; i++) {
> +               buf_hdr = buf_tbl[i];
>
> -               /* Get blocks for this buffer, if pool uses application
> data */
> -               if (buf->buf.size < totsize) {
> -                       intmax_t needed = totsize - buf->buf.size;
> +               /* Mark buffer as allocated */
> +               buf_hdr->allocator = local.thr_id;
> +
> +               /* By default, buffers are not associated with
> +                * an ordered queue */
> +               buf_hdr->origin_qe = NULL;
> +
> +               buf[i] = odp_hdr_to_buf(buf_hdr);
> +
> +               /* Add more segments if buffer from local cache is too
> small */
> +               if (odp_unlikely(buf_hdr->size < totsize)) {
> +                       needed = totsize - buf_hdr->size;
>                         do {
> -                               uint8_t *blk = get_blk(&pool->s);
> -                               if (blk == NULL) {
> -                                       ret_buf(&pool->s, &buf->buf);
> -                                       return ODP_BUFFER_INVALID;
> +                               blk = get_blk(&pool->s);
> +                               if (odp_unlikely(blk == NULL)) {
> +                                       int j;
> +
> +                                       ret_buf(&pool->s, buf_hdr);
> +                                       buf_hdr = NULL;
> +                                       local.cache[pool_id]->s.buffrees--;
> +
> +                                       /* move remaining bufs up one step
> +                                        * and update loop counters */
> +                                       num--;
> +                                       for (j = i; j < num; j++)
> +                                               buf_tbl[j] = buf_tbl[j +
> 1];
> +
> +                                       i--;
> +                                       break;
>                                 }
> -                               buf->buf.addr[buf->buf.segcount++] = blk;
>                                 needed -= pool->s.seg_size;
> +                               buf_hdr->addr[buf_hdr->segcount++] = blk;
> +                               buf_hdr->size = buf_hdr->segcount *
> +                                               pool->s.seg_size;
>                         } while (needed > 0);
> -                       buf->buf.size = buf->buf.segcount *
> pool->s.seg_size;
>                 }
>         }
>
> -       /* Mark buffer as allocated */
> -       buf->buf.allocator = local_id;
> -
> -       /* By default, buffers inherit their pool's zeroization setting */
> -       buf->buf.flags.zeroized = pool->s.flags.zeroized;
> -
> -       /* By default, buffers are not associated with an ordered queue */
> -       buf->buf.origin_qe = NULL;
> -
> -       return odp_hdr_to_buf(&buf->buf);
> +       return num;
>  }
>
> -int buffer_alloc_multi(odp_pool_t pool_hdl, size_t size,
> -                      odp_buffer_t buf[], int num)
> +odp_buffer_t odp_buffer_alloc(odp_pool_t pool_hdl)
>  {
> -       int count;
> +       odp_buffer_t buf;
> +       int num;
>
> -       for (count = 0; count < num; ++count) {
> -               buf[count] = buffer_alloc(pool_hdl, size);
> -               if (buf[count] == ODP_BUFFER_INVALID)
> -                       break;
> -       }
> +       num = buffer_alloc_multi(pool_hdl,
> +
> odp_pool_to_entry(pool_hdl)->s.params.buf.size,
> +                                &buf, 1);
>
> -       return count;
> -}
> +       if (odp_unlikely(num != 1))
> +               return ODP_BUFFER_INVALID;
>
> -odp_buffer_t odp_buffer_alloc(odp_pool_t pool_hdl)
> -{
> -       return buffer_alloc(pool_hdl,
> -
>  odp_pool_to_entry(pool_hdl)->s.params.buf.size);
> +       return buf;
>  }
>
>  int odp_buffer_alloc_multi(odp_pool_t pool_hdl, odp_buffer_t buf[], int
> num)
> @@ -700,35 +878,105 @@ int odp_buffer_alloc_multi(odp_pool_t pool_hdl,
> odp_buffer_t buf[], int num)
>         return buffer_alloc_multi(pool_hdl, buf_size, buf, num);
>  }
>
> -void odp_buffer_free(odp_buffer_t buf)
> +static void multi_pool_free(odp_buffer_hdr_t *buf_hdr[], int num_buf)
>  {
> -       odp_buffer_hdr_t *buf_hdr = odp_buf_to_hdr(buf);
> -       pool_entry_t *pool = odp_buf_to_pool(buf_hdr);
> +       uint32_t pool_id, num;
> +       local_cache_t *buf_cache;
> +       pool_entry_t *pool;
> +       int i, j, idx;
> +
> +       for (i = 0; i < num_buf; i++) {
> +               pool_id   =  pool_handle_to_index(buf_hdr[i]->pool_hdl);
> +               buf_cache = local.cache[pool_id];
> +               num       = buf_cache->s.num_buf;
>
> -       ODP_ASSERT(buf_hdr->allocator != ODP_FREEBUF);
> +               if (num < POOL_MAX_LOCAL_BUFS) {
> +                       ret_local_buf(buf_cache, num, buf_hdr[i]);
> +                       continue;
> +               }
> +
> +               idx  = POOL_MAX_LOCAL_BUFS - POOL_CHUNK_SIZE;
> +               pool = get_pool_entry(pool_id);
> +
> +               /* local cache full, return a chunk */
> +               for (j = 0; j < POOL_CHUNK_SIZE; j++) {
> +                       odp_buffer_hdr_t *tmp;
> +
> +                       tmp = buf_cache->s.buf[idx + i];
> +                       ret_buf(&pool->s, tmp);
> +               }
>
> -       if (odp_unlikely(pool->s.buf_low_wm_assert ||
> pool->s.blk_low_wm_assert))
> -               ret_buf(&pool->s, buf_hdr);
> -       else
> -               ret_local_buf(&pool->s.local_cache[local_id], buf_hdr);
> +               num = POOL_MAX_LOCAL_BUFS - POOL_CHUNK_SIZE;
> +               buf_cache->s.num_buf = num;
> +               ret_local_buf(buf_cache, num, buf_hdr[i]);
> +       }
>  }
>
> -void odp_buffer_free_multi(const odp_buffer_t buf[], int num)
> +void buffer_free_multi(uint32_t pool_id,
> +                      const odp_buffer_t buf[], int num_free)
>  {
> -       int i;
> +       local_cache_t *buf_cache = local.cache[pool_id];
> +       uint32_t num;
> +       int i, idx;
> +       pool_entry_t *pool;
> +       odp_buffer_hdr_t *buf_hdr[num_free];
> +       int multi_pool = 0;
> +
> +       for (i = 0; i < num_free; i++) {
> +               uint32_t id;
> +
> +               buf_hdr[i] = odp_buf_to_hdr(buf[i]);
> +               ODP_ASSERT(buf_hdr[i]->allocator != ODP_FREEBUF);
> +               buf_hdr[i]->allocator = ODP_FREEBUF;
> +               id = pool_handle_to_index(buf_hdr[i]->pool_hdl);
> +               multi_pool |= (pool_id != id);
> +       }
> +
> +       if (odp_unlikely(multi_pool)) {
> +               multi_pool_free(buf_hdr, num_free);
> +               return;
> +       }
> +
> +       num = buf_cache->s.num_buf;
> +
> +       if (odp_likely((num + num_free) < POOL_MAX_LOCAL_BUFS)) {
> +               ret_local_bufs(buf_cache, num, buf_hdr, num_free);
> +               return;
> +       }
> +
> +       pool = get_pool_entry(pool_id);
> +
> +       /* Return at least one chunk into the global pool */
> +       if (odp_unlikely(num_free > POOL_CHUNK_SIZE)) {
> +               for (i = 0; i < num_free; i++)
> +                       ret_buf(&pool->s, buf_hdr[i]);
> +
> +               return;
> +       }
> +
> +       idx = num - POOL_CHUNK_SIZE;
> +       for (i = 0; i < POOL_CHUNK_SIZE; i++)
> +               ret_buf(&pool->s, buf_cache->s.buf[idx + i]);
>
> -       for (i = 0; i < num; ++i)
> -               odp_buffer_free(buf[i]);
> +       num -= POOL_CHUNK_SIZE;
> +       buf_cache->s.num_buf = num;
> +       ret_local_bufs(buf_cache, num, buf_hdr, num_free);
> +
> +       return;
>  }
>
> -void _odp_flush_caches(void)
> +void odp_buffer_free(odp_buffer_t buf)
>  {
> -       int i;
> +       uint32_t pool_id = pool_id_from_buf(buf);
>
> -       for (i = 0; i < ODP_CONFIG_POOLS; i++) {
> -               pool_entry_t *pool = get_pool_entry(i);
> -               flush_cache(&pool->s.local_cache[local_id], &pool->s);
> -       }
> +       buffer_free_multi(pool_id, &buf, 1);
> +}
> +
> +void odp_buffer_free_multi(const odp_buffer_t buf[], int num)
> +{
> +       uint32_t pool_id = pool_id_from_buf(buf[0]);
> +
> +       buffer_free_multi(pool_id, buf, num);
>  }
>
>  void odp_pool_print(odp_pool_t pool_hdl)
> @@ -773,7 +1021,6 @@ void odp_pool_print(odp_pool_t pool_hdl)
>                 pool->s.quiesced ? "quiesced" : "active");
>         ODP_DBG(" pool opts       %s, %s, %s\n",
>                 pool->s.flags.unsegmented ? "unsegmented" : "segmented",
> -               pool->s.flags.zeroized ? "zeroized" : "non-zeroized",
>                 pool->s.flags.predefined  ? "predefined" : "created");
>         ODP_DBG(" pool base       %p\n",  pool->s.pool_base_addr);
>         ODP_DBG(" pool size       %zu (%zu pages)\n",
> @@ -816,10 +1063,11 @@ void odp_pool_print(odp_pool_t pool_hdl)
>         ODP_DBG(" blk low wm count    %lu\n", blklowmct);
>  }
>
> -
>  odp_pool_t odp_buffer_pool(odp_buffer_t buf)
>  {
> -       return odp_buf_to_hdr(buf)->pool_hdl;
> +       uint32_t pool_id = pool_id_from_buf(buf);
> +
> +       return pool_index_to_handle(pool_id);
>  }
>
>  void odp_pool_param_init(odp_pool_param_t *params)
> diff --git a/platform/linux-generic/pktio/dpdk.c
> b/platform/linux-generic/pktio/dpdk.c
> index c21c703..17d63df 100644
> --- a/platform/linux-generic/pktio/dpdk.c
> +++ b/platform/linux-generic/pktio/dpdk.c
> @@ -696,7 +696,7 @@ static int dpdk_stop(pktio_entry_t *pktio_entry)
>  static inline int mbuf_to_pkt(pktio_entry_t *pktio_entry,
>                               odp_packet_t pkt_table[],
>                               struct rte_mbuf *mbuf_table[],
> -                             uint16_t num, odp_time_t *ts)
> +                             uint16_t mbuf_num, odp_time_t *ts)
>  {
>         odp_packet_t pkt;
>         odp_packet_hdr_t *pkt_hdr;
> @@ -705,9 +705,15 @@ static inline int mbuf_to_pkt(pktio_entry_t
> *pktio_entry,
>         void *buf;
>         int i, j;
>         int nb_pkts = 0;
> +       int alloc_len, num;
> +       odp_pool_t pool = pktio_entry->s.pkt_dpdk.pool;
> +
> +       /* Allocate maximum sized packets */
> +       alloc_len = pktio_entry->s.pkt_dpdk.data_room;
> +
> +       num = packet_alloc_multi(pool, alloc_len, pkt_table, mbuf_num);
>
>         for (i = 0; i < num; i++) {
> -               odp_pool_t pool = pktio_entry->s.pkt_dpdk.pool;
>                 odp_packet_hdr_t parsed_hdr;
>
>                 mbuf = mbuf_table[i];
> @@ -728,18 +734,16 @@ static inline int mbuf_to_pkt(pktio_entry_t
> *pktio_entry,
>                                                 &parsed_hdr))
>                                 goto fail;
>                 }
> -               pkt = packet_alloc(pool, pkt_len, 1);
> -               if (pkt == ODP_PACKET_INVALID)
> -                       goto fail;
>
> +               pkt     = pkt_table[i];
>                 pkt_hdr = odp_packet_hdr(pkt);
> +               pull_tail(pkt_hdr, alloc_len - pkt_len);
>
>                 /* For now copy the data in the mbuf,
>                    worry about zero-copy later */
> -               if (odp_packet_copy_from_mem(pkt, 0, pkt_len, buf) != 0) {
> -                       odp_packet_free(pkt);
> +               if (odp_packet_copy_from_mem(pkt, 0, pkt_len, buf) != 0)
>                         goto fail;
> -               }
> +
>                 pkt_hdr->input = pktio_entry->s.handle;
>
>                 if (pktio_cls_enabled(pktio_entry))
> @@ -760,7 +764,9 @@ static inline int mbuf_to_pkt(pktio_entry_t
> *pktio_entry,
>         return nb_pkts;
>
>  fail:
> -       for (j = i; j < num; j++)
> +       odp_packet_free_multi(&pkt_table[i], mbuf_num - i);
> +
> +       for (j = i; j < mbuf_num; j++)
>                 rte_pktmbuf_free(mbuf_table[j]);
>
>         return (i > 0 ? i : -1);
> diff --git a/platform/linux-generic/pktio/netmap.c
> b/platform/linux-generic/pktio/netmap.c
> index d69df6b..67e50b7 100644
> --- a/platform/linux-generic/pktio/netmap.c
> +++ b/platform/linux-generic/pktio/netmap.c
> @@ -598,6 +598,7 @@ static inline int netmap_pkt_to_odp(pktio_entry_t
> *pktio_entry,
>         odp_pool_t pool = pktio_entry->s.pkt_nm.pool;
>         odp_packet_hdr_t *pkt_hdr;
>         odp_packet_hdr_t parsed_hdr;
> +       int num;
>
>         if (odp_unlikely(len > pktio_entry->s.pkt_nm.max_frame_len)) {
>                 ODP_ERR("RX: frame too big %" PRIu16 " %zu!\n", len,
> @@ -615,8 +616,8 @@ static inline int netmap_pkt_to_odp(pktio_entry_t
> *pktio_entry,
>                                         len, &pool, &parsed_hdr))
>                         return -1;
>         }
> -       pkt = packet_alloc(pool, len, 1);
> -       if (pkt == ODP_PACKET_INVALID)
> +       num = packet_alloc_multi(pool, len, &pkt, 1);
> +       if (num != 1)
>                 return -1;
>
>         pkt_hdr = odp_packet_hdr(pkt);
> diff --git a/platform/linux-generic/pktio/pcap.c
> b/platform/linux-generic/pktio/pcap.c
> index be9049a..f6db809 100644
> --- a/platform/linux-generic/pktio/pcap.c
> +++ b/platform/linux-generic/pktio/pcap.c
> @@ -224,19 +224,9 @@ static int pcapif_recv_pkt(pktio_entry_t
> *pktio_entry, int index ODP_UNUSED,
>             pktio_entry->s.config.pktin.bit.ts_ptp)
>                 ts = &ts_val;
>
> -       pkt = ODP_PACKET_INVALID;
> -       pkt_len = 0;
> -
>         for (i = 0; i < len; ) {
>                 int ret;
>
> -               if (pkt == ODP_PACKET_INVALID) {
> -                       pkt = packet_alloc(pcap->pool, 0 /*default len*/,
> 1);
> -                       if (odp_unlikely(pkt == ODP_PACKET_INVALID))
> -                               break;
> -                       pkt_len = odp_packet_len(pkt);
> -               }
> -
>                 ret = pcap_next_ex(pcap->rx, &hdr, &data);
>
>                 /* end of file, attempt to reopen if within loop limit */
> @@ -246,17 +236,17 @@ static int pcapif_recv_pkt(pktio_entry_t
> *pktio_entry, int index ODP_UNUSED,
>                 if (ret != 1)
>                         break;
>
> +               pkt_len = hdr->caplen;
> +
> +               ret = packet_alloc_multi(pcap->pool, pkt_len, &pkt, 1);
> +               if (odp_unlikely(ret != 1))
> +                       break;
> +
>                 if (ts != NULL)
>                         ts_val = odp_time_global();
>
>                 pkt_hdr = odp_packet_hdr(pkt);
>
> -               if (!odp_packet_pull_tail(pkt, pkt_len - hdr->caplen)) {
> -                       ODP_ERR("failed to pull tail: pkt_len: %d caplen:
> %d\n",
> -                               pkt_len, hdr->caplen);
> -                       break;
> -               }
> -
>                 if (odp_packet_copy_from_mem(pkt, 0, hdr->caplen, data) !=
> 0) {
>                         ODP_ERR("failed to copy packet data\n");
>                         break;
> @@ -269,7 +259,6 @@ static int pcapif_recv_pkt(pktio_entry_t *pktio_entry,
> int index ODP_UNUSED,
>                 pkt_hdr->input = pktio_entry->s.handle;
>
>                 pkts[i] = pkt;
> -               pkt = ODP_PACKET_INVALID;
>
>                 i++;
>         }
> @@ -277,9 +266,6 @@ static int pcapif_recv_pkt(pktio_entry_t *pktio_entry,
> int index ODP_UNUSED,
>
>         odp_ticketlock_unlock(&pktio_entry->s.rxl);
>
> -       if (pkt != ODP_PACKET_INVALID)
> -               odp_packet_free(pkt);
> -
>         return i;
>  }
>
> diff --git a/platform/linux-generic/pktio/socket.c
> b/platform/linux-generic/pktio/socket.c
> index 5d85ef5..58d9c5c 100644
> --- a/platform/linux-generic/pktio/socket.c
> +++ b/platform/linux-generic/pktio/socket.c
> @@ -657,6 +657,7 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry,
> int index ODP_UNUSED,
>                         void *base = msgvec[i].msg_hdr.msg_iov->iov_base;
>                         struct ethhdr *eth_hdr = base;
>                         uint16_t pkt_len = msgvec[i].msg_len;
> +                       int num;
>
>                         /* Don't receive packets sent by ourselves */
>                         if (odp_unlikely(ethaddrs_equal(pkt_sock->if_mac,
> @@ -666,8 +667,8 @@ static int sock_mmsg_recv(pktio_entry_t *pktio_entry,
> int index ODP_UNUSED,
>                         if (cls_classify_packet(pktio_entry, base, pkt_len,
>                                                 pkt_len, &pool,
> &parsed_hdr))
>                                 continue;
> -                       pkt = packet_alloc(pool, pkt_len, 1);
> -                       if (pkt == ODP_PACKET_INVALID)
> +                       num = packet_alloc_multi(pool, pkt_len, &pkt, 1);
> +                       if (num != 1)
>                                 continue;
>
>                         pkt_hdr = odp_packet_hdr(pkt);
> @@ -688,10 +689,15 @@ static int sock_mmsg_recv(pktio_entry_t
> *pktio_entry, int index ODP_UNUSED,
>                                    [ODP_BUFFER_MAX_SEG];
>
>                 for (i = 0; i < (int)len; i++) {
> -                       pkt_table[i] = packet_alloc(pkt_sock->pool,
> -                                                   0 /*default*/, 1);
> -                       if (odp_unlikely(pkt_table[i] ==
> ODP_PACKET_INVALID))
> +                       int num;
> +
> +                       num = packet_alloc_multi(pkt_sock->pool,
> +                                                1518 /* max eth frame len
> */,
> +                                                &pkt_table[i], 1);
> +                       if (odp_unlikely(num != 1)) {
> +                               pkt_table[i] = ODP_PACKET_INVALID;
>                                 break;
> +                       }
>
>                         msgvec[i].msg_hdr.msg_iovlen =
>                                 _rx_pkt_to_iovec(pkt_table[i], iovecs[i]);
> diff --git a/platform/linux-generic/pktio/socket_mmap.c
> b/platform/linux-generic/pktio/socket_mmap.c
> index 11bb7d6..9e84e4a 100644
> --- a/platform/linux-generic/pktio/socket_mmap.c
> +++ b/platform/linux-generic/pktio/socket_mmap.c
> @@ -169,6 +169,7 @@ static inline unsigned pkt_mmap_v2_rx(pktio_entry_t
> *pktio_entry,
>                 odp_packet_hdr_t *hdr;
>                 odp_packet_hdr_t parsed_hdr;
>                 odp_pool_t pool = pkt_sock->pool;
> +               int num;
>
>                 if (!mmap_rx_kernel_ready(ring->rd[frame_num].iov_base))
>                         break;
> @@ -206,8 +207,10 @@ static inline unsigned pkt_mmap_v2_rx(pktio_entry_t
> *pktio_entry,
>                         }
>                 }
>
> -               pkt_table[nb_rx] = packet_alloc(pool, pkt_len, 1);
> -               if (odp_unlikely(pkt_table[nb_rx] == ODP_PACKET_INVALID)) {
> +               num = packet_alloc_multi(pool, pkt_len, &pkt_table[nb_rx],
> 1);
> +
> +               if (odp_unlikely(num != 1)) {
> +                       pkt_table[nb_rx] = ODP_PACKET_INVALID;
>                         mmap_rx_user_ready(ppd.raw); /* drop */
>                         frame_num = next_frame_num;
>                         continue;
> diff --git a/platform/linux-generic/pktio/tap.c
> b/platform/linux-generic/pktio/tap.c
> index a9a8886..d758a39 100644
> --- a/platform/linux-generic/pktio/tap.c
> +++ b/platform/linux-generic/pktio/tap.c
> @@ -185,11 +185,12 @@ static odp_packet_t pack_odp_pkt(pktio_entry_t
> *pktio_entry, const void *data,
>  {
>         odp_packet_t pkt;
>         odp_packet_hdr_t *pkt_hdr;
> +       int num;
>
> -       pkt = packet_alloc(pktio_entry->s.pkt_tap.pool, len, 1);
> +       num = packet_alloc_multi(pktio_entry->s.pkt_tap.pool, len, &pkt,
> 1);
>
> -       if (pkt == ODP_PACKET_INVALID)
> -               return pkt;
> +       if (num != 1)
> +               return ODP_PACKET_INVALID;
>
>         if (odp_packet_copy_from_mem(pkt, 0, len, data) < 0) {
>                 ODP_ERR("failed to copy packet data\n");
> --
> 2.8.1
>
>

Reply via email to