On 6/29/23 17:43, Eelco Chaudron wrote:
>
>
> On 22 Jun 2023, at 0:32, Ilya Maximets wrote:
>
>> Current implementation of meters in the userspace datapath takes
>> the meter lock for every packet batch. If more than one thread
>> hits the flow with the same meter, they will lock each other.
>>
>> Replace the critical section with atomic operations to avoid
>> interlocking. Meters themselves are RCU-protected, so it's safe
>> to access them without holding a lock.
>>
>> Implementation does the following:
>>
>> 1. Tries to advance the 'used' timer of the meter with atomic
>> compare+exchange if it's smaller than 'now'.
>> 2. If the timer change succeeds, atomically update band buckets.
>> 3. Atomically update packet statistics for a meter.
>> 4. Go over buckets and try to atomically subtract the amount of
>> packets or bytes, recording the highest exceeded band.
>> 5. Atomically update band statistics and drop packets.
>>
>> Bucket manipulations are implemented with atomic compare+exchange
>> operations with extra checks, because bucket size should never
>> exceed the maximum and it should never go below zero.
>>
>> Packet statistics may be momentarily inconsistent, i.e., number
>> of packets and the number of bytes may reflect different sets
>> of packets. But it should be eventually consistent. And the
>> difference at any given time should be in just few packets.
>>
>> For the sake of reduced code complexity PKTPS meter tries to push
>> packets through the band one by one, even though they all have
>> the same weight. This is also more fair if more than one thread
>> is passing packets through the same band at the same time.
>> Trying to predict the number of packets that can pass may also
>> cause extra atomic operations reducing the performance.
>>
>> This implementation shows similar performance to the previous one,
>> but should scale better with more threads hiting the same meter.
>
> This works looks great!! Some small comments below. Did limited testing and
> seems to work fine.
>
> Cheers,
>
> Eelco
>
>> Signed-off-by: Ilya Maximets <[email protected]>
>> ---
>>
>> @Lin Huang, if you can try this change on your setup, that
>> would be great.
>>
>> NEWS | 2 +
>> lib/dpif-netdev.c | 250 +++++++++++++++++++++++++---------------------
>> 2 files changed, 140 insertions(+), 112 deletions(-)
>>
>> diff --git a/NEWS b/NEWS
>> index 66d5a4ea3..4a2b7dbca 100644
>> --- a/NEWS
>> +++ b/NEWS
>> @@ -37,6 +37,8 @@ Post-v3.1.0
>> - SRv6 Tunnel Protocol
>> * Added support for userspace datapath (only).
>> - Userspace datapath:
>> + * Implementation of OpenFlow meters is now lockless allowing for better
>> + multi-thread scalability.
>> * IP and L4 checksum offload support is now enabled by default for
>> interfaces that support it. See the 'status' column in the
>> 'interface'
>> table to check the status.
>> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>> index abe63412e..2fa556a62 100644
>> --- a/lib/dpif-netdev.c
>> +++ b/lib/dpif-netdev.c
>> @@ -212,21 +212,21 @@ static void dpcls_remove(struct dpcls *, struct
>> dpcls_rule *);
>> struct dp_meter_band {
>> uint32_t rate;
>> uint32_t burst_size;
>> - uint64_t bucket; /* In 1/1000 packets (for PKTPS), or in bits (for
>> KBPS) */
>> - uint64_t packet_count;
>> - uint64_t byte_count;
>> + atomic_uint64_t bucket; /* In 1/1000 packets for PKTPS,
>> + * or in bits for KBPS. */
>> + atomic_uint64_t packet_count;
>> + atomic_uint64_t byte_count;
>> };
>>
>> struct dp_meter {
>> struct cmap_node node;
>> - struct ovs_mutex lock;
>> uint32_t id;
>> uint16_t flags;
>> uint16_t n_bands;
>> uint32_t max_delta_t;
>> - uint64_t used;
>> - uint64_t packet_count;
>> - uint64_t byte_count;
>> + atomic_uint64_t used; /* Time of a last use in milliseconds. */
>> + atomic_uint64_t packet_count;
>> + atomic_uint64_t byte_count;
>> struct dp_meter_band bands[];
>> };
>>
>> @@ -7165,22 +7165,56 @@ dpif_netdev_meter_get_features(const struct dpif *
>> dpif OVS_UNUSED,
>> features->max_color = 0;
>> }
>>
>> +/* Tries to atomically add 'n' to 'value' in terms of saturation arithmetic,
>> + * i.e., if the result will be larger than 'max_value', will store
>> 'max_value'
>> + * instead. */
>> +static void
>> +atomic_sat_add(atomic_uint64_t *value, uint64_t n, uint64_t max_value)
>> +{
>> + uint64_t current, new_value;
>> +
>> + atomic_read_relaxed(value, ¤t);
>> + do {
>> + new_value = current + n;
>> + new_value = MIN(new_value, max_value);
>> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t,
>> + new_value));
>> +}
>> +
>> +/* Tries to atomically subtract 'n' from 'value'. Does not perform the
>> + * operation and returns 'false' if the result will be less than
>> 'min_value'.
>> + * Otherwise, stores the result and returns 'true'. */
>> +static bool
>> +atomic_bound_sub(atomic_uint64_t *value, uint64_t n, uint64_t min_value)
>> +{
>> + uint64_t current;
>> +
>> + atomic_read_relaxed(value, ¤t);
>> + do {
>> + if (current < min_value + n) {
>> + return false;
>> + }
>> + } while (!atomic_compare_exchange_weak_relaxed(value, ¤t,
>> + current - n));
>> + return true;
>> +}
>> +
>> /* Applies the meter identified by 'meter_id' to 'packets_'. Packets
>> * that exceed a band are dropped in-place. */
>> static void
>> dp_netdev_run_meter(struct dp_netdev *dp, struct dp_packet_batch *packets_,
>> - uint32_t meter_id, long long int now)
>> + uint32_t meter_id, long long int now_ms)
>> {
>> - struct dp_meter *meter;
>> - struct dp_meter_band *band;
>> - struct dp_packet *packet;
>> - long long int long_delta_t; /* msec */
>> - uint32_t delta_t; /* msec */
>> const size_t cnt = dp_packet_batch_size(packets_);
>> - uint32_t bytes, volume;
>> - int exceeded_band[NETDEV_MAX_BURST];
>> uint32_t exceeded_rate[NETDEV_MAX_BURST];
>> - int exceeded_pkt = cnt; /* First packet that exceeded a band rate. */
>> + uint32_t exceeded_band[NETDEV_MAX_BURST];
>> + uint64_t bytes, volume, meter_used, old;
>> + uint64_t band_packets[MAX_BANDS];
>> + uint64_t band_bytes[MAX_BANDS];
>> + struct dp_meter_band *band;
>> + struct dp_packet *packet;
>> + struct dp_meter *meter;
>> + bool exceeded = false;
>>
>> if (meter_id >= MAX_METERS) {
>> return;
>> @@ -7196,116 +7230,102 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct
>> dp_packet_batch *packets_,
>> /* Initialize as zeroes. */
>> memset(exceeded_rate, 0, cnt * sizeof *exceeded_rate);
>>
>> - ovs_mutex_lock(&meter->lock);
>> - /* All packets will hit the meter at the same time. */
>> - long_delta_t = now / 1000 - meter->used / 1000; /* msec */
>> + atomic_read_relaxed(&meter->used, &meter_used);
>> + do {
>> + if (meter_used >= now_ms) {
>> + /* The '>' condition means that we have several threads hitting
>> the
>> + * same meter, and the other one already advanced the time. */
>> + meter_used = now_ms;
>> + break;
>> + }
>> + } while (!atomic_compare_exchange_weak_relaxed(&meter->used,
>> + &meter_used, now_ms));
>>
>> - if (long_delta_t < 0) {
>> - /* This condition means that we have several threads fighting for a
>> - meter lock, and the one who received the packets a bit later
>> wins.
>> - Assuming that all racing threads received packets at the same
>> time
>> - to avoid overflow. */
>> - long_delta_t = 0;
>> - }
>> + /* Refill all buckets right away, since other threads may use them. */
>> + if (meter_used < now_ms) {
>> + /* All packets will hit the meter at the same time. */
>> + uint64_t delta_t = now_ms - meter_used;
>> +
>> + /* Make sure delta_t will not be too large, so that bucket will not
>> + * wrap around below. */
>> + delta_t = MIN(delta_t, meter->max_delta_t);
>>
>> - /* Make sure delta_t will not be too large, so that bucket will not
>> - * wrap around below. */
>> - delta_t = (long_delta_t > (long long int)meter->max_delta_t)
>> - ? meter->max_delta_t : (uint32_t)long_delta_t;
>> + for (int m = 0; m < meter->n_bands; m++) {
>> + band = &meter->bands[m];
>> + /* Update band's bucket. We can't just use atomic add here,
>> + * because we should never add above the max capacity. */
>> + atomic_sat_add(&band->bucket, delta_t * band->rate,
>> + band->burst_size * 1000ULL);
>> + }
>> + }
>>
>> /* Update meter stats. */
>> - meter->used = now;
>> - meter->packet_count += cnt;
>> + atomic_add_relaxed(&meter->packet_count, cnt, &old);
>> bytes = 0;
>> DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>> bytes += dp_packet_size(packet);
>> }
>> - meter->byte_count += bytes;
>> + atomic_add_relaxed(&meter->byte_count, bytes, &old);
>>
>> /* Meters can operate in terms of packets per second or kilobits per
>> * second. */
>> if (meter->flags & OFPMF13_PKTPS) {
>> - /* Rate in packets/second, bucket 1/1000 packets. */
>> - /* msec * packets/sec = 1/1000 packets. */
>> + /* Rate in packets/second, bucket 1/1000 packets.
>> + * msec * packets/sec = 1/1000 packets. */
>> volume = cnt * 1000; /* Take 'cnt' packets from the bucket. */
>> } else {
>> - /* Rate in kbps, bucket in bits. */
>> - /* msec * kbps = bits */
>> + /* Rate in kbps, bucket in bits.
>> + * msec * kbps = bits */
>> volume = bytes * 8;
>> }
>>
>> - /* Update all bands and find the one hit with the highest rate for each
>> - * packet (if any). */
>> - for (int m = 0; m < meter->n_bands; ++m) {
>> - uint64_t max_bucket_size;
>> -
>> + /* Find the band hit with the highest rate for each packet (if any). */
>> + for (int m = 0; m < meter->n_bands; m++) {
>> band = &meter->bands[m];
>> - max_bucket_size = band->burst_size * 1000ULL;
>> - /* Update band's bucket. */
>> - band->bucket += (uint64_t) delta_t * band->rate;
>> - if (band->bucket > max_bucket_size) {
>> - band->bucket = max_bucket_size;
>> - }
>>
>> /* Drain the bucket for all the packets, if possible. */
>> - if (band->bucket >= volume) {
>> - band->bucket -= volume;
>> - } else {
>> - int band_exceeded_pkt;
>> -
>> - /* Band limit hit, must process packet-by-packet. */
>> - if (meter->flags & OFPMF13_PKTPS) {
>> - band_exceeded_pkt = band->bucket / 1000;
>> - band->bucket %= 1000; /* Remainder stays in bucket. */
>> -
>> - /* Update the exceeding band for each exceeding packet.
>> - * (Only one band will be fired by a packet, and that
>> - * can be different for each packet.) */
>> - for (int i = band_exceeded_pkt; i < cnt; i++) {
>> - if (band->rate > exceeded_rate[i]) {
>> - exceeded_rate[i] = band->rate;
>> - exceeded_band[i] = m;
>> - }
>> - }
>> - } else {
>> - /* Packet sizes differ, must process one-by-one. */
>> - band_exceeded_pkt = cnt;
>> - DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>> - uint32_t bits = dp_packet_size(packet) * 8;
>> -
>> - if (band->bucket >= bits) {
>> - band->bucket -= bits;
>> - } else {
>> - if (i < band_exceeded_pkt) {
>> - band_exceeded_pkt = i;
>> - }
>> - /* Update the exceeding band for the exceeding
>> packet.
>> - * (Only one band will be fired by a packet, and
>> that
>> - * can be different for each packet.) */
>> - if (band->rate > exceeded_rate[i]) {
>> - exceeded_rate[i] = band->rate;
>> - exceeded_band[i] = m;
>> - }
>> - }
>> + if (atomic_bound_sub(&band->bucket, volume, 0)) {
>> + continue;
>> + }
>> +
>> + /* Band limit hit, must process packet-by-packet. */
>> + DP_PACKET_BATCH_FOR_EACH (i, packet, packets_) {
>> + uint64_t packet_volume = (meter->flags & OFPMF13_PKTPS)
>> + ? 1000 : (dp_packet_size(packet) * 8);
>> +
>> + if (!atomic_bound_sub(&band->bucket, packet_volume, 0)) {
>> + /* Update the exceeding band for the exceeding packet.
>> + * Only one band will be fired by a packet, and that can
>> + * be different for each packet. */
>> + if (band->rate > exceeded_rate[i]) {
>> + exceeded_rate[i] = band->rate;
>> + exceeded_band[i] = m;
>> + exceeded = true;
>> }
>> }
>> - /* Remember the first exceeding packet. */
>> - if (exceeded_pkt > band_exceeded_pkt) {
>> - exceeded_pkt = band_exceeded_pkt;
>> - }
>> }
>> }
>>
>> + /* No need to iterate over packets if there are no drops. */
>> + if (!exceeded) {
>> + return;
>> + }
>> +
>> /* Fire the highest rate band exceeded by each packet, and drop
>> * packets if needed. */
>> +
>
> Don’t think we need this newline.
The comment above applies to the whole section below. Without an empty
line it looks like it's a comment for memsets, and it is not.
I'd like to keep it, but can remove if you insist. :)
>
>> + memset(band_packets, 0, sizeof band_packets);
>> + memset(band_bytes, 0, sizeof band_bytes);
>
> Are these extra spaces for alignment on purpose?
Yes, seemed easier to read this way.
>
>> +
>> size_t j;
>> DP_PACKET_BATCH_REFILL_FOR_EACH (j, cnt, packet, packets_) {
>> - if (exceeded_band[j] >= 0) {
>> + uint32_t m = exceeded_band[j];
>> +
>> + if (m != UINT32_MAX) {
>> /* Meter drop packet. */
>> - band = &meter->bands[exceeded_band[j]];
>> - band->packet_count += 1;
>> - band->byte_count += dp_packet_size(packet);
>> - COVERAGE_INC(datapath_drop_meter);
>> + band = &meter->bands[m];
>> + band_packets[m]++;
>> + band_bytes[m] += dp_packet_size(packet);
>
>
> This code now looks like this (the diff is a mess to comment on):
>
> if (m != UINT32_MAX) {
> /* Meter drop packet. */
> band = &meter->bands[m];
>
> ! ^^^ This line can be removed as band is not used.
True. Can remove.
>
> band_packets[m]++;
> band_bytes[m] += dp_packet_size(packet);
> dp_packet_delete(packet);
> } else {
> /* Meter accepts packet. */
> dp_packet_batch_refill(packets_, packet, j);
> }
> }
>
>> dp_packet_delete(packet);
>> } else {
>> /* Meter accepts packet. */
>> @@ -7313,7 +7333,15 @@ dp_netdev_run_meter(struct dp_netdev *dp, struct
>> dp_packet_batch *packets_,
>> }
>> }
>>
>> - ovs_mutex_unlock(&meter->lock);
>> + for (int m = 0; m < meter->n_bands; m++) {
>> + if (!band_packets[m]) {
>> + continue;
>> + }
>> + band = &meter->bands[m];
>> + atomic_add_relaxed(&band->packet_count, band_packets[m], &old);
>> + atomic_add_relaxed(&band->byte_count, band_bytes[m], &old);
>
> Are these extra spaces for alignment on purpose?
Yep, same as with memset. This code a is a bit complex and alignment
makes it easier to read, IMO.
>
>> + COVERAGE_ADD(datapath_drop_meter, band_packets[m]);
>> + }
>> }
>>
>> /* Meter set/get/del processing is still single-threaded. */
>> @@ -7354,13 +7382,13 @@ dpif_netdev_meter_set(struct dpif *dpif,
>> ofproto_meter_id meter_id,
>> meter->flags = config->flags;
>> meter->n_bands = config->n_bands;
>> meter->max_delta_t = 0;
>> - meter->used = time_usec();
>> meter->id = mid;
>> - ovs_mutex_init_adaptive(&meter->lock);
>> + atomic_init(&meter->used, time_msec());
>>
>> /* set up bands */
>> for (i = 0; i < config->n_bands; ++i) {
>> uint32_t band_max_delta_t;
>> + uint64_t bucket_size;
>>
>> /* Set burst size to a workable value if none specified. */
>> if (config->bands[i].burst_size == 0) {
>> @@ -7370,11 +7398,11 @@ dpif_netdev_meter_set(struct dpif *dpif,
>> ofproto_meter_id meter_id,
>> meter->bands[i].rate = config->bands[i].rate;
>> meter->bands[i].burst_size = config->bands[i].burst_size;
>> /* Start with a full bucket. */
>> - meter->bands[i].bucket = meter->bands[i].burst_size * 1000ULL;
>> + bucket_size = meter->bands[i].burst_size * 1000ULL;
>> + atomic_init(&meter->bands[i].bucket, bucket_size);
>>
>> /* Figure out max delta_t that is enough to fill any bucket. */
>> - band_max_delta_t
>> - = meter->bands[i].bucket / meter->bands[i].rate;
>> + band_max_delta_t = bucket_size / meter->bands[i].rate;
>> if (band_max_delta_t > meter->max_delta_t) {
>> meter->max_delta_t = band_max_delta_t;
>> }
>> @@ -7397,7 +7425,7 @@ dpif_netdev_meter_get(const struct dpif *dpif,
>> {
>> struct dp_netdev *dp = get_dp_netdev(dpif);
>> uint32_t meter_id = meter_id_.uint32;
>> - const struct dp_meter *meter;
>> + struct dp_meter *meter;
>>
>> if (meter_id >= MAX_METERS) {
>> return EFBIG;
>> @@ -7411,17 +7439,15 @@ dpif_netdev_meter_get(const struct dpif *dpif,
>> if (stats) {
>> int i = 0;
>>
>> - ovs_mutex_lock(&meter->lock);
>> -
>> - stats->packet_in_count = meter->packet_count;
>> - stats->byte_in_count = meter->byte_count;
>> + atomic_read_relaxed(&meter->packet_count, &stats->packet_in_count);
>> + atomic_read_relaxed(&meter->byte_count, &stats->byte_in_count);
>>
>> for (i = 0; i < n_bands && i < meter->n_bands; ++i) {
>> - stats->bands[i].packet_count = meter->bands[i].packet_count;
>> - stats->bands[i].byte_count = meter->bands[i].byte_count;
>> + atomic_read_relaxed(&meter->bands[i].packet_count,
>> + &stats->bands[i].packet_count);
>> + atomic_read_relaxed(&meter->bands[i].byte_count,
>> + &stats->bands[i].byte_count);
>> }
>> -
>> - ovs_mutex_unlock(&meter->lock);
>> stats->n_bands = i;
>> }
>>
>> @@ -9173,7 +9199,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>> *packets_,
>>
>> case OVS_ACTION_ATTR_METER:
>> dp_netdev_run_meter(pmd->dp, packets_, nl_attr_get_u32(a),
>> - pmd->ctx.now);
>> + pmd->ctx.now / 1000);
>> break;
>>
>> case OVS_ACTION_ATTR_PUSH_VLAN:
>> --
>> 2.40.1
>
_______________________________________________
dev mailing list
[email protected]
https://mail.openvswitch.org/mailman/listinfo/ovs-dev