On 27/10/2020 20:29, Flavio Leitner wrote:

Thanks again for the review Flavio. Comments below.

v10 @
https://mail.openvswitch.org/pipermail/ovs-dev/2020-October/376730.html

> On Fri, Sep 18, 2020 at 02:48:17PM -0400, Mark Gray wrote:
>> From: Aaron Conole <[email protected]>
>>
>> Currently, the channel handlers are polled globally.  On some
>> systems, this causes a thundering herd issue where multiple
>> handler threads become active, only to do no work and immediately
>> sleep.
>>
>> The approach here is to push the netlink socket channels to discreet
>> handler threads to process, rather than polling on every thread.
>> This will eliminate the need to wake multiple threads.
>>
>> To check:
>>
>>   ip netns add left
>>   ip netns add right
>>   ip link add center-left type veth peer name left0 netns left
>>   ip link add center-right type veth peer name right0 netns right
>>   ip link set center-left up
>>   ip link set center-right up
>>   ip -n left ip link set left0 up
>>   ip -n left ip addr add 172.31.110.10/24 dev left0
>>   ip -n right ip link set right0 up
>>   ip -n right ip addr add 172.31.110.11/24 dev right0
>>
>>   ovs-vsctl add-br br0
>>   ovs-vsctl add-port br0 center-right
>>   ovs-vsctl add-port br0 center-left
>>
>>   # in one terminal
>>   perf record -e sched:sched_wakeup,irq:softirq_entry -ag
>>
>>   # in a separate terminal
>>   ip netns exec left arping -I left0 -c 1 172.31.110.11
>>
>>   # in the perf terminal after exiting
>>   perf script
>>
>> Look for the number of 'handler' threads which were made active.
>>
>> Suggested-by: Ben Pfaff <[email protected]>
>> Suggested-by: Flavio Leitner <[email protected]>
>> Co-authored-by: Mark Gray <[email protected]>
>> Reported-by: David Ahern <[email protected]>
>> Reported-at: 
>> https://mail.openvswitch.org/pipermail/ovs-dev/2019-December/365857.html
>> Cc: Matteo Croce <[email protected]>
>> Fixes: 69c51582f ("dpif-netlink: don't allocate per thread netlink sockets")
>> Signed-off-by: Aaron Conole <[email protected]>
>> Signed-off-by: Mark Gray <[email protected]>
>> ---
>>
>> v2: Oops - forgot to commit my whitespace cleanups.
>> v3: one port latency results as per Matteo's comments
>>
>>     Stock:
>>       min/avg/max/mdev = 21.5/36.5/96.5/1.0 us
>>     With Patch:
>>       min/avg/max/mdev = 5.3/9.7/98.4/0.5 us 
>> v4: Oops - first email did not send
>> v5:   min/avg/max/mdev = 9.3/10.4/33.6/2.2 us
>> v6: Split out the AUTHORS patch and added a cover letter as
>>     per Ilya's suggestion.
>>     Fixed 0-day issues.
>> v7: Merged patches as per Flavio's suggestion. This is
>>     no longer a series. Fixed some other small issues.
>> v9: Udated based on feedback from Ilya. Also implemented
>>     another suggestion by Flavio
>>       min/avg/max/mdev = 5.5/11.0/83.2/0.8 us
>>
>>  lib/dpif-netlink.c | 309 ++++++++++++++++++++++-----------------------
>>  1 file changed, 149 insertions(+), 160 deletions(-)
>>
>> diff --git a/lib/dpif-netlink.c b/lib/dpif-netlink.c
>> index 7da4fb54d..7e4823ea0 100644
>> --- a/lib/dpif-netlink.c
>> +++ b/lib/dpif-netlink.c
>> @@ -160,8 +160,15 @@ static void dpif_netlink_flow_to_dpif_flow(struct 
>> dpif_flow *,
>>  
>>  /* One of the dpif channels between the kernel and userspace. */
>>  struct dpif_channel {
>> -    struct nl_sock *sock;       /* Netlink socket. */
>> -    long long int last_poll;    /* Last time this channel was polled. */
>> +    struct hmap_node handler_node; /* Used by 'struct dpif_handler' to track
>> +                                      all channels associated with a
>> +                                      handler. */
>> +    struct hmap_node dpif_node;    /* Used by 'struct dpif' to track all 
>> dpif
>> +                                      channels and their associated port. */
>> +    struct dpif_handler *handler;  /* Pointer to owning 'struct 
>> dpif_handler'*/
>> +    odp_port_t port_no;            /* Datapath port number of this channel. 
>> */
>> +    struct nl_sock *sock;          /* Netlink socket. */
>> +    long long int last_poll;       /* Last time this channel was polled. */
>>  };
>>  
>>  #ifdef _WIN32
>> @@ -183,6 +190,9 @@ struct dpif_handler {
>>      int n_events;                 /* Num events returned by epoll_wait(). */
>>      int event_offset;             /* Offset into 'epoll_events'. */
>>  
>> +    struct hmap channels;         /* Map of channels for each port 
>> associated
>> +                                     with this handler. */
>> +
>>  #ifdef _WIN32
>>      /* Pool of sockets. */
>>      struct dpif_windows_vport_sock *vport_sock_pool;
>> @@ -201,9 +211,7 @@ struct dpif_netlink {
>>      struct fat_rwlock upcall_lock;
>>      struct dpif_handler *handlers;
>>      uint32_t n_handlers;           /* Num of upcall handlers. */
>> -    struct dpif_channel *channels; /* Array of channels for each port. */
>> -    int uc_array_size;             /* Size of 'handler->channels' and */
>> -                                   /* 'handler->epoll_events'. */
>> +    struct hmap channels;          /* Map of all channels. */
>>  
>>      /* Change notification. */
>>      struct nl_sock *port_notifier; /* vport multicast group subscriber. */
>> @@ -287,6 +295,44 @@ close_nl_sock(struct nl_sock *sock)
>>  #endif
>>  }
>>  
>> +static size_t
>> +dpif_handler_channels_count(const struct dpif_handler *handler)
>> +{
>> +    return hmap_count(&handler->channels);
>> +}
>> +
>> +static struct dpif_channel *
>> +dpif_channel_find(const struct dpif_netlink *dpif, odp_port_t port_no)
>> +{
>> +    struct dpif_channel *channel;
>> +
>> +    HMAP_FOR_EACH_WITH_HASH (channel, dpif_node,
>> +                             hash_int(odp_to_u32(port_no), 0),
>> +                             &dpif->channels) {
>> +        if (channel->port_no == port_no) {
>> +            return channel;
>> +        }
>> +    }
>> +
>> +    return NULL;
>> +}
>> +
>> +static void
>> +dpif_channel_del(struct dpif_netlink *dpif, struct dpif_channel *channel)
>> +{
>> +    struct dpif_handler *handler = channel->handler;
>> +
>> +#ifndef _WIN32
>> +    epoll_ctl(handler->epoll_fd, EPOLL_CTL_DEL,
>> +              nl_sock_fd(channel->sock), NULL);
>> +    nl_sock_destroy(channel->sock);
>> +#endif
>> +    hmap_remove(&handler->channels, &channel->handler_node);
>> +    hmap_remove(&dpif->channels, &channel->dpif_node);
>> +    handler->event_offset = handler->n_events = 0;
>> +    free(channel);
>> +}
>> +
>>  static struct dpif_netlink *
>>  dpif_netlink_cast(const struct dpif *dpif)
>>  {
>> @@ -385,6 +431,7 @@ open_dpif(const struct dpif_netlink_dp *dp, struct dpif 
>> **dpifp)
>>  
>>      dpif->dp_ifindex = dp->dp_ifindex;
>>      dpif->user_features = dp->user_features;
>> +    hmap_init(&dpif->channels);
>>      *dpifp = &dpif->dpif;
>>  
>>      return 0;
>> @@ -446,153 +493,122 @@ error:
>>  }
>>  #endif /* _WIN32 */
>>  
>> -/* Given the port number 'port_idx', extracts the pid of netlink socket
>> +/* Given the port number 'port_no', extracts the pid of netlink socket
>>   * associated to the port and assigns it to 'upcall_pid'. */
>>  static bool
>> -vport_get_pid(struct dpif_netlink *dpif, uint32_t port_idx,
>> +vport_get_pid(const struct dpif_netlink *dpif, odp_port_t port_no,
>>                uint32_t *upcall_pid)
>>  {
>> -    /* Since the nl_sock can only be assigned in either all
>> -     * or none "dpif" channels, the following check
>> -     * would suffice. */
>> -    if (!dpif->channels[port_idx].sock) {
>> -        return false;
>> -    }
>> -    ovs_assert(!WINDOWS || dpif->n_handlers <= 1);
>> +    struct dpif_channel *channel;
>>  
>> -    *upcall_pid = nl_sock_pid(dpif->channels[port_idx].sock);
>> +    ovs_assert(!WINDOWS || dpif->n_handlers <= 1);
>>  
>> -    return true;
>> +    channel = dpif_channel_find(dpif, port_no);
>> +    if (channel) {
>> +        *upcall_pid = nl_sock_pid(channel->sock);
>> +        return true;
>> +    }
>> +    return false;
>>  }
>>  
>>  static int
>>  vport_add_channel(struct dpif_netlink *dpif, odp_port_t port_no,
>>                    struct nl_sock *sock)
>>  {
>> +    struct dpif_channel *channel;
>> +    struct dpif_handler *handler;
>>      struct epoll_event event;
>> -    uint32_t port_idx = odp_to_u32(port_no);
>>      size_t i;
>> -    int error;
>>  
>> -    if (dpif->handlers == NULL) {
>> +    if (dpif->handlers == NULL || dpif_channel_find(dpif, port_no)) {
>>          close_nl_sock(sock);
> 
> I know this code was already there, but it seems odd to have
> this function to close the sock in case of error. It should
> be the parent, if needed, to do that.
> 
> Also, we have some returns below which don't do that, so it's
> not a common action.

Yeah, I will refactor that.

> 
> 
>> -        return 0;
>> +        return EINVAL;
>>      }
>>  
>> -    /* We assume that the datapath densely chooses port numbers, which can
>> -     * therefore be used as an index into 'channels' and 'epoll_events' of
>> -     * 'dpif'. */
>> -    if (port_idx >= dpif->uc_array_size) {
>> -        uint32_t new_size = port_idx + 1;
>> -
>> -        if (new_size > MAX_PORTS) {
>> -            VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
>> -                         dpif_name(&dpif->dpif), port_no);
>> -            return EFBIG;
>> -        }
>> +    if (odp_to_u32(port_no) >= MAX_PORTS) {
>> +        VLOG_WARN_RL(&error_rl, "%s: datapath port %"PRIu32" too big",
>> +                     dpif_name(&dpif->dpif), port_no);
>> +        return EFBIG;
>> +    }
>>  
>> -        dpif->channels = xrealloc(dpif->channels,
>> -                                  new_size * sizeof *dpif->channels);
>> +    handler = &dpif->handlers[0];
>>  
>> -        for (i = dpif->uc_array_size; i < new_size; i++) {
>> -            dpif->channels[i].sock = NULL;
>> +    for (i = 0; i < dpif->n_handlers; i++) {
>> +        if (dpif_handler_channels_count(&dpif->handlers[i]) <
>> +            dpif_handler_channels_count(handler)) {
>> +            handler = &dpif->handlers[i];
>>          }
>> +    }
>>  
>> -        for (i = 0; i < dpif->n_handlers; i++) {
>> -            struct dpif_handler *handler = &dpif->handlers[i];
>> -
>> -            handler->epoll_events = xrealloc(handler->epoll_events,
>> -                new_size * sizeof *handler->epoll_events);
>> +    channel = xmalloc(sizeof *channel);
>> +    channel->port_no = port_no;
>> +    channel->sock = sock;
>> +    channel->last_poll = LLONG_MIN;
>> +    channel->handler = handler;
>> +    hmap_insert(&handler->channels, &channel->handler_node,
>> +                hash_int(odp_to_u32(port_no), 0));
>> +    hmap_insert(&dpif->channels, &channel->dpif_node,
>> +                hash_int(odp_to_u32(port_no), 0));
>>  
>> -        }
>> -        dpif->uc_array_size = new_size;
>> -    }
>> +    handler->epoll_events = xrealloc(handler->epoll_events,
>> +                                     dpif_handler_channels_count(handler) *
>> +                                     sizeof *handler->epoll_events);
> 
> The epoll_events is only needed by epoll_wait() which is only
> useful if epoll_ctl() succeed, so that xrealloc() could be
> done only after epoll_ctl() succeed.

Yes, I will move all that code down.

> 
>>      memset(&event, 0, sizeof event);
>>      event.events = EPOLLIN | EPOLLEXCLUSIVE;
>> -    event.data.u32 = port_idx;
>> -
>> -    for (i = 0; i < dpif->n_handlers; i++) {
>> -        struct dpif_handler *handler = &dpif->handlers[i];
>> +    event.data.u32 = odp_to_u32(port_no);
> 
> I think the above code initializing event could be moved to the
> #ifndef below, including the xrealloc() since they are all
> related and they are not large.
> 
Done
> 
>>  
>>  #ifndef _WIN32
>> -        if (epoll_ctl(handler->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
>> -                      &event) < 0) {
>> -            error = errno;
>> -            goto error;
>> -        }
>> -#endif
>> -    }
>> -    dpif->channels[port_idx].sock = sock;
>> -    dpif->channels[port_idx].last_poll = LLONG_MIN;
>> -
>> -    return 0;
>> -
>> -error:
>> -#ifndef _WIN32
>> -    while (i--) {
>> -        epoll_ctl(dpif->handlers[i].epoll_fd, EPOLL_CTL_DEL,
>> -                  nl_sock_fd(sock), NULL);
>> +    if (epoll_ctl(handler->epoll_fd, EPOLL_CTL_ADD, nl_sock_fd(sock),
>> +                  &event) < 0) {
>> +        hmap_remove(&handler->channels, &channel->handler_node);
>> +        hmap_remove(&dpif->channels, &channel->dpif_node);
>> +        free(channel);
> 
> Here is one example of returning error without closing the sock.

I moved closure of the socket outside this function.

> 
>> +        return errno;
>>      }
>>  #endif
>> -    dpif->channels[port_idx].sock = NULL;
>>  
>> -    return error;
>> +    return 0;
>>  }
>>  
>> -static void
>> -vport_del_channels(struct dpif_netlink *dpif, odp_port_t port_no)
>> +static bool
>> +vport_del_channel(struct dpif_netlink *dpif, odp_port_t port_no)
>>  {
>> -    uint32_t port_idx = odp_to_u32(port_no);
>> -    size_t i;
>> +    struct dpif_channel *channel;
>>  
>> -    if (!dpif->handlers || port_idx >= dpif->uc_array_size
>> -        || !dpif->channels[port_idx].sock) {
>> -        return;
>> +    channel = dpif_channel_find(dpif, port_no);
>> +    if (channel) {
>> +        dpif_channel_del(dpif, channel);
>> +        return true;
>>      }
>>  
>> -    for (i = 0; i < dpif->n_handlers; i++) {
>> -        struct dpif_handler *handler = &dpif->handlers[i];
>> -#ifndef _WIN32
>> -        epoll_ctl(handler->epoll_fd, EPOLL_CTL_DEL,
>> -                  nl_sock_fd(dpif->channels[port_idx].sock), NULL);
>> -#endif
>> -        handler->event_offset = handler->n_events = 0;
>> -    }
>> -#ifndef _WIN32
>> -    nl_sock_destroy(dpif->channels[port_idx].sock);
>> -#endif
>> -    dpif->channels[port_idx].sock = NULL;
>> +    return false;
>>  }
>>  
>>  static void
>>  destroy_all_channels(struct dpif_netlink *dpif)
>>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>>  {
>> -    unsigned int i;
>> +    struct dpif_channel *channel, *next;
>> +    size_t i;
>>  
>>      if (!dpif->handlers) {
>>          return;
>>      }
>>  
>> -    for (i = 0; i < dpif->uc_array_size; i++ ) {
>> +    HMAP_FOR_EACH_SAFE (channel, next, dpif_node, &dpif->channels) {
>>          struct dpif_netlink_vport vport_request;
>>          uint32_t upcall_pids = 0;
>>  
>> -        if (!dpif->channels[i].sock) {
>> -            continue;
>> -        }
>> -
>>          /* Turn off upcalls. */
>>          dpif_netlink_vport_init(&vport_request);
>>          vport_request.cmd = OVS_VPORT_CMD_SET;
>>          vport_request.dp_ifindex = dpif->dp_ifindex;
>> -        vport_request.port_no = u32_to_odp(i);
>> +        vport_request.port_no = channel->port_no;
>>          vport_request.n_upcall_pids = 1;
>>          vport_request.upcall_pids = &upcall_pids;
>>          dpif_netlink_vport_transact(&vport_request, NULL, NULL);
>> -
>> -        vport_del_channels(dpif, u32_to_odp(i));
>> +        dpif_channel_del(dpif, channel);
>>      }
>>  
>>      for (i = 0; i < dpif->n_handlers; i++) {
>> @@ -601,12 +617,10 @@ destroy_all_channels(struct dpif_netlink *dpif)
>>          dpif_netlink_handler_uninit(handler);
>>          free(handler->epoll_events);
>>      }
>> -    free(dpif->channels);
>> +
>>      free(dpif->handlers);
>>      dpif->handlers = NULL;
>> -    dpif->channels = NULL;
>>      dpif->n_handlers = 0;
>> -    dpif->uc_array_size = 0;
>>  }
>>  
>>  static void
>> @@ -618,9 +632,11 @@ dpif_netlink_close(struct dpif *dpif_)
>>  
>>      fat_rwlock_wrlock(&dpif->upcall_lock);
>>      destroy_all_channels(dpif);
>> +    hmap_destroy(&dpif->channels);
>>      fat_rwlock_unlock(&dpif->upcall_lock);
>>  
>>      fat_rwlock_destroy(&dpif->upcall_lock);
>> +
>>      free(dpif);
>>  }
>>  
>> @@ -1002,9 +1018,6 @@ dpif_netlink_port_del__(struct dpif_netlink *dpif, 
>> odp_port_t port_no)
>>      }
>>  #endif
>>      error = dpif_netlink_vport_transact(&vport, NULL, NULL);
>> -
>> -    vport_del_channels(dpif, port_no);
>> -
>>      if (!error && !ovs_tunnels_out_of_tree) {
>>          error = dpif_netlink_rtnl_port_destroy(dpif_port.name, 
>> dpif_port.type);
>>          if (error == EOPNOTSUPP) {
>> @@ -1012,6 +1025,10 @@ dpif_netlink_port_del__(struct dpif_netlink *dpif, 
>> odp_port_t port_no)
>>          }
>>      }
>>  
>> +    if (!vport_del_channel(dpif, port_no)) {
>> +        error = EINVAL;
>> +    }
>> +
>>      dpif_port_destroy(&dpif_port);
>>  
>>      return error;
>> @@ -1080,41 +1097,27 @@ dpif_netlink_port_query_by_name(const struct dpif 
>> *dpif_, const char *devname,
>>  }
>>  
>>  static uint32_t
>> -dpif_netlink_port_get_pid__(const struct dpif_netlink *dpif,
>> -                            odp_port_t port_no)
>> -    OVS_REQ_RDLOCK(dpif->upcall_lock)
>> +dpif_netlink_port_get_pid(const struct dpif *dpif_, odp_port_t port)
>>  {
>> -    uint32_t port_idx = odp_to_u32(port_no);
>> +    const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>>      uint32_t pid = 0;
>> +    odp_port_t port_no = port;
> 
> 
> The port_no initialized above is initialized again right after
> that.

Fixed

> 
>>  
>> -    if (dpif->handlers && dpif->uc_array_size > 0) {
>> -        /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
>> -         * channel, since it is not heavily loaded. */
>> -        uint32_t idx = port_idx >= dpif->uc_array_size ? 0 : port_idx;
>> -
>> -        /* Needs to check in case the socket pointer is changed in between
>> -         * the holding of upcall_lock.  A known case happens when the main
>> -         * thread deletes the vport while the handler thread is handling
>> -         * the upcall from that port. */
>> -        if (dpif->channels[idx].sock) {
>> -            pid = nl_sock_pid(dpif->channels[idx].sock);
>> -        }
>> -    }
>> -
>> -    return pid;
>> -}
>> -
>> -static uint32_t
>> -dpif_netlink_port_get_pid(const struct dpif *dpif_, odp_port_t port_no)
>> -{
>> -    const struct dpif_netlink *dpif = dpif_netlink_cast(dpif_);
>> -    uint32_t ret;
>> +    /* The ODPP_NONE "reserved" port number uses the "ovs-system"'s
>> +     * channel, since it is not heavily loaded. */
>> +    port_no = port == ODPP_NONE ? u32_to_odp(0) : port;
> 
> Here.
> 
> 
>>  
>>      fat_rwlock_rdlock(&dpif->upcall_lock);
>> -    ret = dpif_netlink_port_get_pid__(dpif, port_no);
>> +    /* Try to find the PID associated with 'port no'. If we fail, we can 
>> fall
>> +       back to the system channel PID, or return 0 indicating a failure. */
>> +    if (!vport_get_pid(dpif, port_no, &pid)) {
>> +        if (!vport_get_pid(dpif, 0, &pid)) {
>> +            pid = 0;
>> +        }
>> +    }
>>      fat_rwlock_unlock(&dpif->upcall_lock);
>>  
>> -    return ret;
>> +    return pid;
>>  }
>>  
>>  static int
>> @@ -2342,12 +2345,14 @@ dpif_netlink_operate(struct dpif *dpif_, struct 
>> dpif_op **ops, size_t n_ops,
>>  static void
>>  dpif_netlink_handler_uninit(struct dpif_handler *handler)
>>  {
>> +    hmap_destroy(&handler->channels);
>>      vport_delete_sock_pool(handler);
>>  }
>>  
>>  static int
>>  dpif_netlink_handler_init(struct dpif_handler *handler)
>>  {
>> +    hmap_init(&handler->channels);
>>      return vport_create_sock_pool(handler);
>>  }
>>  #else
>> @@ -2355,6 +2360,7 @@ dpif_netlink_handler_init(struct dpif_handler *handler)
>>  static int
>>  dpif_netlink_handler_init(struct dpif_handler *handler)
>>  {
>> +    hmap_init(&handler->channels);
>>      handler->epoll_fd = epoll_create(10);
>>      return handler->epoll_fd < 0 ? errno : 0;
>>  }
>> @@ -2362,6 +2368,7 @@ dpif_netlink_handler_init(struct dpif_handler *handler)
>>  static void
>>  dpif_netlink_handler_uninit(struct dpif_handler *handler)
>>  {
>> +    hmap_destroy(&handler->channels);
>>      close(handler->epoll_fd);
>>  }
>>  #endif
>> @@ -2374,9 +2381,7 @@ static int
>>  dpif_netlink_refresh_channels(struct dpif_netlink *dpif, uint32_t 
>> n_handlers)
>>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>>  {
>> -    unsigned long int *keep_channels;
>>      struct dpif_netlink_vport vport;
>> -    size_t keep_channels_nbits;
>>      struct nl_dump dump;
>>      uint64_t reply_stub[NL_DUMP_BUFSIZE / 8];
>>      struct ofpbuf buf;
>> @@ -2416,18 +2421,13 @@ dpif_netlink_refresh_channels(struct dpif_netlink 
>> *dpif, uint32_t n_handlers)
>>          handler->event_offset = handler->n_events = 0;
>>      }
>>  
>> -    keep_channels_nbits = dpif->uc_array_size;
>> -    keep_channels = bitmap_allocate(keep_channels_nbits);
>> -
>>      ofpbuf_use_stub(&buf, reply_stub, sizeof reply_stub);
>>      dpif_netlink_port_dump_start__(dpif, &dump);
>>      while (!dpif_netlink_port_dump_next__(dpif, &dump, &vport, &buf)) {
>> -        uint32_t port_no = odp_to_u32(vport.port_no);
>>          uint32_t upcall_pid;
>>          int error;
>>  
>> -        if (port_no >= dpif->uc_array_size
>> -            || !vport_get_pid(dpif, port_no, &upcall_pid)) {
>> +        if (!vport_get_pid(dpif, vport.port_no, &upcall_pid)) {
>>              struct nl_sock *sock;
>>              error = create_nl_sock(dpif, &sock);
>>  
>> @@ -2475,25 +2475,15 @@ dpif_netlink_refresh_channels(struct dpif_netlink 
>> *dpif, uint32_t n_handlers)
>>              }
>>          }
>>  
>> -        if (port_no < keep_channels_nbits) {
>> -            bitmap_set1(keep_channels, port_no);
>> -        }
>>          continue;
>>  
>>      error:
>> -        vport_del_channels(dpif, vport.port_no);
>> +        vport_del_channel(dpif, vport.port_no);
>>      }
>> +
>>      nl_dump_done(&dump);
>>      ofpbuf_uninit(&buf);
>>  
>> -    /* Discard any saved channels that we didn't reuse. */
>> -    for (i = 0; i < keep_channels_nbits; i++) {
>> -        if (!bitmap_is_set(keep_channels, i)) {
>> -            vport_del_channels(dpif, u32_to_odp(i));
>> -        }
>> -    }
>> -    free(keep_channels);
>> -
>>      return retval;
>>  }
>>  
>> @@ -2714,6 +2704,8 @@ dpif_netlink_recv__(struct dpif_netlink *dpif, 
>> uint32_t handler_id,
>>      OVS_REQ_RDLOCK(dpif->upcall_lock)
>>  {
>>      struct dpif_handler *handler;
>> +    size_t n_channels;
>> +
>>      int read_tries = 0;
>>  
>>      if (!dpif->handlers || handler_id >= dpif->n_handlers) {
>> @@ -2721,14 +2713,15 @@ dpif_netlink_recv__(struct dpif_netlink *dpif, 
>> uint32_t handler_id,
>>      }
>>  
>>      handler = &dpif->handlers[handler_id];
>> -    if (handler->event_offset >= handler->n_events) {
>> +    n_channels = dpif_handler_channels_count(handler);
>> +    if (handler->event_offset >= handler->n_events && n_channels) {
>>          int retval;
>>  
>>          handler->event_offset = handler->n_events = 0;
>>  
>>          do {
>>              retval = epoll_wait(handler->epoll_fd, handler->epoll_events,
>> -                                dpif->uc_array_size, 0);
>> +                                n_channels, 0);
>>          } while (retval < 0 && errno == EINTR);
>>  
>>          if (retval < 0) {
>> @@ -2741,7 +2734,9 @@ dpif_netlink_recv__(struct dpif_netlink *dpif, 
>> uint32_t handler_id,
>>  
>>      while (handler->event_offset < handler->n_events) {
>>          int idx = handler->epoll_events[handler->event_offset].data.u32;
>> -        struct dpif_channel *ch = &dpif->channels[idx];
>> +        struct dpif_channel *ch = dpif_channel_find(dpif, u32_to_odp(idx));
>> +        ovs_assert(ch != NULL);
>> +        ovs_assert(ch->handler == handler);
>>  
>>          handler->event_offset++;
>>  
>> @@ -2842,16 +2837,10 @@ static void
>>  dpif_netlink_recv_purge__(struct dpif_netlink *dpif)
>>      OVS_REQ_WRLOCK(dpif->upcall_lock)
>>  {
>> -    if (dpif->handlers) {
>> -        size_t i;
>> +    struct dpif_channel *channel;
>>  
>> -        if (!dpif->channels[0].sock) {
>> -            return;
>> -        }
>> -        for (i = 0; i < dpif->uc_array_size; i++ ) {
>> -
>> -            nl_sock_drain(dpif->channels[i].sock);
>> -        }
>> +    HMAP_FOR_EACH (channel, dpif_node, &dpif->channels) {
>> +        nl_sock_drain(channel->sock);
>>      }
>>  }
> 
> 
> The above function is so small now that I would suggest to
> add the code to dpif_netlink_recv_purge().

Ok

> 
> Thanks,
> 

_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev

Reply via email to