On 31 Jan 2024, at 18:03, Aaron Conole wrote:
> Eelco Chaudron <[email protected]> writes: > >> On 25 Jan 2024, at 21:55, Aaron Conole wrote: >> >>> From: Kevin Sprague <[email protected]> >>> >>> During normal operations, it is useful to understand when a particular flow >>> gets removed from the system. This can be useful when debugging performance >>> issues tied to ofproto flow changes, trying to determine deployed traffic >>> patterns, or while debugging dynamic systems where ports come and go. >>> >>> Prior to this change, there was a lack of visibility around flow expiration. >>> The existing debugging infrastructure could tell us when a flow was added to >>> the datapath, but not when it was removed or why. >>> >>> This change introduces a USDT probe at the point where the revalidator >>> determines that the flow should be removed. Additionally, we track the >>> reason for the flow eviction and provide that information as well. With >>> this change, we can track the complete flow lifecycle for the netlink >>> datapath by hooking the upcall tracepoint in kernel, the flow put USDT, and >>> the revaldiator USDT, letting us watch as flows are added and removed from >>> the kernel datapath. >>> >>> This change only enables this information via USDT probe, so it won't be >>> possible to access this information any other way (see: >>> Documentation/topics/usdt-probes.rst). >>> >>> Also included is a script (utilities/usdt-scripts/flow_reval_monitor.py) >>> which serves as a demonstration of how the new USDT probe might be used >>> going forward. >>> >>> Signed-off-by: Kevin Sprague <[email protected]> >>> Co-authored-by: Aaron Conole <[email protected]> >>> Signed-off-by: Aaron Conole <[email protected]> >> >> Thanks for following this up Aaron! See comments on this patch below. I have >> no additional comments on patch 2. >> >> Cheers, >> >> Eelco >> >> >>> --- >>> Documentation/topics/usdt-probes.rst | 1 + >>> ofproto/ofproto-dpif-upcall.c | 42 +- >>> utilities/automake.mk | 3 + >>> utilities/usdt-scripts/flow_reval_monitor.py | 653 +++++++++++++++++++ >>> 4 files changed, 693 insertions(+), 6 deletions(-) >>> create mode 100755 utilities/usdt-scripts/flow_reval_monitor.py >>> >>> diff --git a/Documentation/topics/usdt-probes.rst >>> b/Documentation/topics/usdt-probes.rst >>> index e527f43bab..a8da9bb1f7 100644 >>> --- a/Documentation/topics/usdt-probes.rst >>> +++ b/Documentation/topics/usdt-probes.rst >>> @@ -214,6 +214,7 @@ Available probes in ``ovs_vswitchd``: >>> - dpif_recv:recv_upcall >>> - main:poll_block >>> - main:run_start >>> +- revalidate:flow_result >>> - revalidate_ukey\_\_:entry >>> - revalidate_ukey\_\_:exit >>> - udpif_revalidator:start_dump >> >> You are missing the specific flow_result result section. This is from the >> previous patch: > > D'oh! Thanks for catching it. I'll re-add it. > >> @@ -358,6 +360,27 @@ See also the ``main:run_start`` probe above. >> - ``utilities/usdt-scripts/bridge_loop.bt`` >> >> >> +probe revalidate:flow_result >> +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ >> + >> +**Description**: >> +This probe is triggered when the revalidator decides whether or not to >> +revalidate a flow. ``reason`` is an enum that denotes that either the flow >> +is being kept, or the reason why the flow is being deleted. The >> +``flow_reval_monitor.py`` script uses this probe to notify users when flows >> +matching user-provided criteria are deleted. >> + >> +**Arguments**: >> + >> +- *arg0*: ``(enum flow_del_reason) reason`` >> +- *arg1*: ``(struct udpif *) udpif`` >> +- *arg2*: ``(struct udpif_key *) ukey`` >> + >> +**Script references**: >> + >> +- ``utilities/usdt-scripts/flow_reval_monitor.py`` >> + >> + >> Adding your own probes >> ---------------------- >> >>> diff --git a/ofproto/ofproto-dpif-upcall.c b/ofproto/ofproto-dpif-upcall.c >>> index b5cbeed878..97d75833f7 100644 >>> --- a/ofproto/ofproto-dpif-upcall.c >>> +++ b/ofproto/ofproto-dpif-upcall.c >>> @@ -269,6 +269,18 @@ enum ukey_state { >>> }; >>> #define N_UKEY_STATES (UKEY_DELETED + 1) >>> >>> +enum flow_del_reason { >>> + FDR_REVALIDATE = 0, /* The flow was revalidated. */ >> >> It was called FDR_FLOW_LIVE before, which might make more sense. As the flow >> is just NOT deleted. It might or might not have been revalidated. Thoughts? > > I think it had to have been revalidated if we emit the reason, because > we only emit the reason code after revalidation. IE: there are many > places where we skip revalidation but the flow stays live - and we don't > emit reasons in those cases. > > So at least for this patch, it MUST have been revalidated. But maybe in > the future, we would want to catch cases where the flow hasn't been. In > that case, it makes sense to add the FDR_FLOW_LIVE at that time - I > think. > > Maybe you disagree? Well, it depends on how you define revalidation, it might only have updated the counters. i.e. it all depends on ‘bool need_revalidate = ukey->reval_seq != reval_seq;’ in revalidate_ukey(). That was why I opted for a more general name. >>> + FDR_FLOW_IDLE, /* The flow went unused and was deleted. */ >>> + FDR_TOO_EXPENSIVE, /* The flow was too expensive to revalidate. */ >>> + FDR_FLOW_WILDCARDED, /* The flow needed a narrower wildcard mask. */ >>> + FDR_BAD_ODP_FIT, /* The flow had a bad ODP flow fit. */ >>> + FDR_NO_OFPROTO, /* The flow didn't have an associated ofproto. >>> */ >>> + FDR_XLATION_ERROR, /* There was an error translating the flow. */ >>> + FDR_AVOID_CACHING, /* Flow deleted to avoid caching. */ >>> + FDR_FLOW_LIMIT, /* All flows being killed. */ >> >> Looking at the comment from Han on FDR_PURGE, and this patch needing another >> spin, we should probably add it. > > I can do that, sure. In that case, we will need to have a new flow op > added to revalidator_sweep__ so that we can catch it. But in that case, > it will be a different usdt probe, so I still don't know if we need > FDR_PURGE right? WDYT? In revalidator_sweep__() you have sort of the following: if (purge || ukey_state == UKEY_INCONSISTENT) { result = UKEY_DELETE; } else if (!seq_mismatch) { And I’m afraid that if we use this tool to debug we miss the ukey_state == UKEY_INCONSISTENT when debugging and spent a long time figuring this out. Maybe add something general like this (did not give it a lot of thought), and only take the FDR_PURGE : FDR_UPDATE_FAIL results in the script? /* 'udpif_key's are responsible for tracking the little bit of state udpif @@ -2991,13 +2993,13 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) uint64_t odp_actions_stub[1024 / 8]; struct ofpbuf odp_actions = OFPBUF_STUB_INITIALIZER(odp_actions_stub); - enum flow_del_reason reason = FDR_REVALIDATE; struct ukey_op ops[REVALIDATE_MAX_BATCH]; struct udpif_key *ukey; struct umap *umap = &udpif->ukeys[i]; size_t n_ops = 0; CMAP_FOR_EACH(ukey, cmap_node, &umap->cmap) { + enum flow_del_reason reason = FDR_REVALIDATE; enum ukey_state ukey_state; /* Handler threads could be holding a ukey lock while it installs a @@ -3016,8 +3018,10 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) if (purge || ukey_state == UKEY_INCONSISTENT) { result = UKEY_DELETE; + reason = purge ? FDR_PURGE : FDR_UPDATE_FAIL; } else if (!seq_mismatch) { result = UKEY_KEEP; + reason = FDR_REVALIDATE; //_KEEP } else { struct dpif_flow_stats stats; COVERAGE_INC(revalidate_missed_dp_flow); @@ -3030,6 +3034,8 @@ revalidator_sweep__(struct revalidator *revalidator, bool purge) reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, &odp_actions); } + OVS_USDT_PROBE(revalidator_sweep__, flow_result, result, + reason, udpif, ukey); } ovs_mutex_unlock(&ukey->mutex); In addition in revalidator_sweep__() should the “enum flow_del_reason reason = FDR_REVALIDATE;” not be moved to the CMAP_FOR_EACH() loop? >>> +}; >>> + >>> /* 'udpif_key's are responsible for tracking the little bit of state udpif >>> * needs to do flow expiration which can't be pulled directly from the >>> * datapath. They may be created by any handler or revalidator thread at >>> any >>> @@ -2272,7 +2284,8 @@ populate_xcache(struct udpif *udpif, struct udpif_key >>> *ukey, >>> static enum reval_result >>> revalidate_ukey__(struct udpif *udpif, const struct udpif_key *ukey, >>> uint16_t tcp_flags, struct ofpbuf *odp_actions, >>> - struct recirc_refs *recircs, struct xlate_cache *xcache) >>> + struct recirc_refs *recircs, struct xlate_cache *xcache, >>> + enum flow_del_reason *reason) >>> { >>> struct xlate_out *xoutp; >>> struct netflow *netflow; >>> @@ -2293,11 +2306,13 @@ revalidate_ukey__(struct udpif *udpif, const struct >>> udpif_key *ukey, >>> netflow = NULL; >>> >>> if (xlate_ukey(udpif, ukey, tcp_flags, &ctx)) { >>> + *reason = FDR_XLATION_ERROR; >>> goto exit; >>> } >>> xoutp = &ctx.xout; >>> >>> if (xoutp->avoid_caching) { >>> + *reason = FDR_AVOID_CACHING; >>> goto exit; >>> } >>> >>> @@ -2311,6 +2326,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >>> udpif_key *ukey, >>> ofpbuf_clear(odp_actions); >>> >>> if (!ofproto) { >>> + *reason = FDR_NO_OFPROTO; >>> goto exit; >>> } >>> >>> @@ -2322,6 +2338,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >>> udpif_key *ukey, >>> if (odp_flow_key_to_mask(ukey->mask, ukey->mask_len, &dp_mask, >>> &ctx.flow, >>> NULL) >>> == ODP_FIT_ERROR) { >>> + *reason = FDR_BAD_ODP_FIT; >>> goto exit; >>> } >>> >>> @@ -2331,6 +2348,7 @@ revalidate_ukey__(struct udpif *udpif, const struct >>> udpif_key *ukey, >>> * down. Note that we do not know if the datapath has ignored any of >>> the >>> * wildcarded bits, so we may be overly conservative here. */ >>> if (flow_wildcards_has_extra(&dp_mask, ctx.wc)) { >>> + *reason = FDR_FLOW_WILDCARDED; >>> goto exit; >>> } >>> >>> @@ -2400,7 +2418,7 @@ static enum reval_result >>> revalidate_ukey(struct udpif *udpif, struct udpif_key *ukey, >>> const struct dpif_flow_stats *stats, >>> struct ofpbuf *odp_actions, uint64_t reval_seq, >>> - struct recirc_refs *recircs) >>> + struct recirc_refs *recircs, enum flow_del_reason *reason) >>> OVS_REQUIRES(ukey->mutex) >>> { >>> bool need_revalidate = ukey->reval_seq != reval_seq; >>> @@ -2430,8 +2448,12 @@ revalidate_ukey(struct udpif *udpif, struct >>> udpif_key *ukey, >>> xlate_cache_clear(ukey->xcache); >>> } >>> result = revalidate_ukey__(udpif, ukey, push.tcp_flags, >>> - odp_actions, recircs, ukey->xcache); >>> - } /* else delete; too expensive to revalidate */ >>> + odp_actions, recircs, ukey->xcache, >>> + reason); >>> + } else { >>> + /* delete; too expensive to revalidate */ >>> + *reason = FDR_TOO_EXPENSIVE; >>> + } >>> } else if (!push.n_packets || ukey->xcache >>> || !populate_xcache(udpif, ukey, push.tcp_flags)) { >>> result = UKEY_KEEP; >>> @@ -2831,6 +2853,7 @@ revalidate(struct revalidator *revalidator) >>> for (f = flows; f < &flows[n_dumped]; f++) { >>> long long int used = f->stats.used; >>> struct recirc_refs recircs = RECIRC_REFS_EMPTY_INITIALIZER; >>> + enum flow_del_reason reason = FDR_REVALIDATE; >>> struct dpif_flow_stats stats = f->stats; >>> enum reval_result result; >>> struct udpif_key *ukey; >>> @@ -2905,9 +2928,14 @@ revalidate(struct revalidator *revalidator) >>> } >>> if (kill_them_all || (used && used < now - max_idle)) { >>> result = UKEY_DELETE; >>> + if (kill_them_all) { >>> + reason = FDR_FLOW_LIMIT; >>> + } else { >>> + reason = FDR_FLOW_IDLE; >>> + } >>> } else { >>> result = revalidate_ukey(udpif, ukey, &stats, &odp_actions, >>> - reval_seq, &recircs); >>> + reval_seq, &recircs, &reason); >>> } >>> ukey->dump_seq = dump_seq; >>> >>> @@ -2916,6 +2944,7 @@ revalidate(struct revalidator *revalidator) >>> udpif_update_flow_pps(udpif, ukey, f); >>> } >>> >>> + OVS_USDT_PROBE(revalidate, flow_result, reason, udpif, ukey); >>> if (result != UKEY_KEEP) { >>> /* Takes ownership of 'recircs'. */ >>> reval_op_init(&ops[n_ops++], result, udpif, ukey, &recircs, >>> @@ -2962,6 +2991,7 @@ revalidator_sweep__(struct revalidator *revalidator, >>> bool purge) >>> uint64_t odp_actions_stub[1024 / 8]; >>> struct ofpbuf odp_actions = >>> OFPBUF_STUB_INITIALIZER(odp_actions_stub); >>> >>> + enum flow_del_reason reason = FDR_REVALIDATE; >>> struct ukey_op ops[REVALIDATE_MAX_BATCH]; >>> struct udpif_key *ukey; >>> struct umap *umap = &udpif->ukeys[i]; >>> @@ -2993,7 +3023,7 @@ revalidator_sweep__(struct revalidator *revalidator, >>> bool purge) >>> COVERAGE_INC(revalidate_missed_dp_flow); >>> memcpy(&stats, &ukey->stats, sizeof stats); >>> result = revalidate_ukey(udpif, ukey, &stats, >>> &odp_actions, >>> - reval_seq, &recircs); >>> + reval_seq, &recircs, &reason); >>> } >>> if (result != UKEY_KEEP) { >>> /* Clears 'recircs' if filled by revalidate_ukey(). */ >>> diff --git a/utilities/automake.mk b/utilities/automake.mk >>> index 9a2114df40..146b8c37fb 100644 >>> --- a/utilities/automake.mk >>> +++ b/utilities/automake.mk >>> @@ -23,6 +23,7 @@ scripts_DATA += utilities/ovs-lib >>> usdt_SCRIPTS += \ >>> utilities/usdt-scripts/bridge_loop.bt \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/kernel_delay.py \ >>> utilities/usdt-scripts/kernel_delay.rst \ >>> utilities/usdt-scripts/reval_monitor.py \ >>> @@ -72,6 +73,7 @@ EXTRA_DIST += \ >>> utilities/docker/debian/build-kernel-modules.sh \ >>> utilities/usdt-scripts/bridge_loop.bt \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/kernel_delay.py \ >>> utilities/usdt-scripts/kernel_delay.rst \ >>> utilities/usdt-scripts/reval_monitor.py \ >>> @@ -146,6 +148,7 @@ FLAKE8_PYFILES += utilities/ovs-pcap.in \ >>> utilities/ovs-tcpdump.in \ >>> utilities/ovs-pipegen.py \ >>> utilities/usdt-scripts/dpif_nl_exec_monitor.py \ >>> + utilities/usdt-scripts/flow_reval_monitor.py \ >>> utilities/usdt-scripts/upcall_monitor.py \ >>> utilities/usdt-scripts/upcall_cost.py >>> >>> diff --git a/utilities/usdt-scripts/flow_reval_monitor.py >>> b/utilities/usdt-scripts/flow_reval_monitor.py >>> new file mode 100755 >>> index 0000000000..e808020bb5 >>> --- /dev/null >>> +++ b/utilities/usdt-scripts/flow_reval_monitor.py >>> @@ -0,0 +1,653 @@ >>> +#!/usr/bin/env python3 >>> +# >>> +# Copyright (c) 2022 Redhat, Inc. >>> +# >>> +# Licensed under the Apache License, Version 2.0 (the "License"); >>> +# you may not use this file except in compliance with the License. >>> +# You may obtain a copy of the License at: >>> +# >>> +# http://www.apache.org/licenses/LICENSE-2.0 >>> +# >>> +# Unless required by applicable law or agreed to in writing, software >>> +# distributed under the License is distributed on an "AS IS" BASIS, >>> +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. >>> +# See the License for the specific language governing permissions and >>> +# limitations under the License. >>> +# >>> +# Script information: >>> +# ------------------- >>> +# flow_reval_monitor.py uses the dpif_netlink_operate:flow_put and >>> +# revalidator:flow_result USDT probes to monitor flow lifetimes and >>> +# expiration events. By default, this will show all flow_put and flow >>> +# expiration events, along with their reasons. This will look like so: >>> +# >>> +# TIME UFID >>> EVENT/REASON >>> +# 101536.226986736 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >>> +# 101536.227196214 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow_put >>> +# 101541.516610178 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow_put >>> +# 101541.516967303 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow_put >>> +# 101551.688050747 ufid:fddd6510-26dc-4c87-8f7a-0000fc0c2c3a flow >>> timed out >>> +# 101551.688077175 ufid:fc5cc4a2-39e7-4a2d-bbce-000019665b32 flow >>> timed out >>> +# 101557.695391371 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow >>> timed out >>> +# 101557.695408909 ufid:d08472b6-110e-46cb-a9e4-00008f46838e flow >>> timed out >>> +# >>> +# flow key data can be printed using the --flow-keys option. This will >>> +# print the equivalent datapath flow string. >>> +# >>> +# When filtering flows, the syntax is the same as used by >>> +# `ovs-appctl dpctl/add-flow`. >>> +# >>> +# The following options are available: >>> +# >>> +# usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] >>> +# [-k [FLOW_KEYS]] [-p VSWITCHD_PID] >>> +# [-D [DEBUG]] [-f [FLOW STRING ...]] >> >> # usage: flow_reval_monitor.py [-h] [--buffer-page-count NUMBER] >> [-f [64-2048]] [-k] [-l [FLOW_STRING ...]] >> [-p VSWITCHD_PID] [-D [DEBUG]] > > Oops, I'll fix it up. > >>> +# >>> +# optional arguments: >>> +# -h, --help show this help message and exit >>> +# --buffer-page-count NUMBER >>> +# Number of BPF ring buffer pages, default 1024 >>> +# -f <64..2048>, --flow-key-size=<64..2048> >>> +# Set the size of the flow key, default 64 >>> +# -k, --flow-keys Print flow keys as flow strings >>> +# -l [FLOW_STRING ...], --filter-flows [FLOW_STRING ...] >>> +# Filter flows that match the specified ODP-like >>> flow >> >> We do not filter on the flow itself but on the packet content/keys creating >> the flow. >> We might want to clarify this as the actual DP flow might not include., for >> example, the IP fields. > > I guess it's ambiguous. I'll try and clean up the language. Because we > are filtering on the ODP flow key, and not an openflow string. > >>> +# -p VSWITCHD_PID, --pid VSWITCHD_PID >>> +# ovs-vswitchd's PID >>> +# -D [DEBUG], --debug [DEBUG] >>> +# Enable eBPF debugging >>> +# >>> +# Examples: >>> +# >>> +# To use the script on a running ovs-vswitchd to see flow keys and >>> expiration >>> +# events for flows with an ipv4 source of 192.168.10.10: >>> +# $ ./flow_reval_monitor.py --flow-keys --filter-flows \ >>> +# "ipv4(src=192.168.10.10)" >> >> Can we add some details on what kind of filters/format is supported? For >> example no mask support. > > Sure, I can add it. > >>> +# TIME UFID >>> EVENT/REASON >>> +# 105082.457322742 ufid:f76fc899-376d-466b-bc74-0000b933eb97 flow_put >>> +# ufid:f76fc899-376d-466b-bc74-0000b933eb97 has the following flow >>> information: >>> +# in_port(2), >>> +# eth(src=0e:04:47:fc:74:51, dst=da:dc:c5:69:05:d7), \ >>> +# eth_type(0x800), \ >>> +# ipv4(src=192.168.10.10, dst=192.168.10.30, proto=1, tos=0, >>> ttl=64,[...]), >>> +# icmp(type=8, code=0) >>> +# 105092.635450202 ufid:f76fc899-376d-466b-bc74-0000b933eb97 Flow >>> timed out >>> +# >>> +# Notes: >>> +# 1) No options are needed to attach when there is a single running >>> instance >>> +# of ovs-vswitchd. >>> +# 2) If you're using the flow filtering option, it will only track flows >>> that >>> +# have been upcalled since the script began running. >>> +# 3) When using the flow filtering option, the key size will likely need >>> to >>> +# be expanded to match on all the fields in the message. The default >>> is >>> +# kept small to keep the buffer copy sizes down when displaying >>> +# flows (-k), but is hardcoded to 2048 when an actual filter (-l) is >>> +# applied >> >> We should add a note that the flow_put part is not included when HW offload >> (TC) is used for the kernel datapath, or if DPDK is used. > > That makes sense. But we will still have a revalidator output in > f.e. the DPDK case, IIRC. True, I just want to make sure we are clear that we will not see the flow_put messages hence the filtering will not work. >>> >>> +try: >>> + from bcc import BPF >>> + from bcc import USDT >>> + from bcc import USDTException >>> +except ModuleNotFoundError: >>> + print("ERROR: Can't find the BPF Compiler Collection Tools.") >>> + print("Please install them before running this script.") >>> + exit(1) >>> + >>> +import argparse >>> +from ipaddress import IPv4Address, IPv6Address >>> +import psutil >>> +import struct >>> +import sys >>> +import time >>> + >>> +# >>> +# eBPF source code >>> +# >>> +bpf_src = """ >>> +#include <linux/sched.h> >>> +#include <uapi/linux/ptrace.h> >>> + >>> +#define MAX_KEY <MAX_KEY_VAL> >>> +#define FLOW_FILTER <FILTER_BOOL> >>> + >>> +enum probe { OP_FLOW_PUT, FLOW_RESULT }; >>> + >>> +typedef union ovs_u128 { >>> + unsigned int ufid32[4]; >>> + unsigned long long ufid64[2]; >>> +} ovs_u128; >>> + >>> +struct dpif_flow_put { >>> + int flags; >>> + void *key_ptr; >>> + size_t key_len; >>> + void *mask_ptr; >>> + size_t mask_len; >>> + u64 action_ptr; >>> + size_t action_len; >>> + void *ufid_ptr; >>> +}; >>> + >>> +struct udpif_key { >>> + void *cmap_node; >>> + void *key_ptr; >>> + size_t key_len; >>> + void *mask_ptr; >>> + size_t mask_len; >>> + ovs_u128 ufid; >>> +}; >>> + >>> +struct event_t { >>> + u64 ts; >>> + u32 reason; >>> + u32 ufid[4]; /* Can't seem to make the ovs_u128 pass to python side. */ >> >> Is this still true? > > I didn't try it. Actually, I think these data structures can all be > extracted with pahole or something which converts the ovs_u128. > > Actually I think there's some converter under the hood and it doesn't > have a mapping of what 'ovs_u128' means. So we need to basically teach > it to make it work if we want that. Ok, not a blocking thing, just wondered if there was a quick fix or not. I thought it might be related to the BCC issue. >>> + u64 key_size; >>> + u8 key[MAX_KEY]; >>> + enum probe probe; >>> +}; >>> + >>> +BPF_HASH(watchlist, ovs_u128); >>> +BPF_RINGBUF_OUTPUT(events, <BUFFER_PAGE_COUNT>); >>> + >>> +int usdt__flow_result(struct pt_regs *ctx) { >>> + u64 *ufid_present = NULL; >>> + struct udpif_key ukey; >>> + >>> + bpf_usdt_readarg_p(3, ctx, &ukey, sizeof ukey); >>> + ovs_u128 ufid = ukey.ufid; >>> + ufid_present = watchlist.lookup(&ufid); >>> + if(FLOW_FILTER && !ufid_present) { >>> + return 0; >>> + } >>> + >>> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >>> + if(!event) { >>> + /* If we can't reserve the space in the ring buffer, return 1. */ >> >> See comments at the end regarding __sync_fetch_and_add(). >> >>> + return 1; >>> + } >>> + >>> + event->probe = FLOW_RESULT; >>> + event->ts = bpf_ktime_get_ns(); >>> + bpf_probe_read(&event->ufid, sizeof ufid, &ufid); >>> + bpf_usdt_readarg(1, ctx, &event->reason); >>> + events.ringbuf_submit(event, 0); >>> + >>> + return 0; >>> +}; >>> + >>> + >>> +int usdt__op_flow_put(struct pt_regs *ctx) { >>> + struct dpif_flow_put put; >>> + ovs_u128 ufid; >>> + >>> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >>> + if(!event) { >>> + /* If we can't reserve the space in the ring buffer, return 1. */ >> >> See comments at the end regarding __sync_fetch_and_add(). >> >>> + return 1; >>> + } >>> + >>> + event->probe = OP_FLOW_PUT; >>> + event->ts = bpf_ktime_get_ns(); >>> + bpf_usdt_readarg_p(2, ctx, &put, sizeof put); >>> + bpf_probe_read(&event->ufid, sizeof event->ufid, put.ufid_ptr); >>> + bpf_probe_read(&ufid, sizeof ufid, &event->ufid); >>> + if (put.key_len > MAX_KEY) { >>> + put.key_len = MAX_KEY; >>> + } >>> + event->key_size = put.key_len; >>> + bpf_probe_read(&event->key, put.key_len, put.key_ptr); >>> + event->reason = 0; >>> + events.ringbuf_submit(event, 0); >>> + >>> + watchlist.increment(ufid); >>> + return 0; >>> +}; >>> +""" >>> + >>> + >>> +# >>> +# buffer_size_type() >>> +# >>> +def buffer_size_type(astr, min=64, max=2048): >>> + value = int(astr) >>> + if min <= value <= max: >>> + return value >>> + else: >>> + raise argparse.ArgumentTypeError( >>> + 'value not in range {}-{}'.format(min, max)) >>> + >>> + >>> +# >>> +# format_ufid() >>> +# >>> +def format_ufid(ufid): >>> + if ufid is None: >>> + return "ufid:none" >>> + >>> + return "ufid:{:08x}-{:04x}-{:04x}-{:04x}-{:04x}{:08x}".format( >>> + ufid[0], ufid[1] >> 16, ufid[1] & 0xffff, >>> + ufid[2] >> 16, ufid[2] & 0, ufid[3]) >>> + >>> + >>> +# >>> +# find_and_delete_from_watchlist() >>> +# >>> +def find_and_delete_from_watchlist(event): >>> + for k, _ in b["watchlist"].items(): >>> + key_ufid = struct.unpack("=IIII", k) >>> + if key_ufid == tuple(event.ufid): >>> + key = (b["watchlist"].Key * 1)(k) >>> + b["watchlist"].items_delete_batch(key) >>> + break >>> + >>> + >>> +# >>> +# handle_flow_put() >>> +# >>> +def handle_flow_put(event): >>> + if args.flow_keys or args.filter_flows is not None: >>> + key = decode_key(bytes(event.key)[:event.key_size]) >>> + flow_dict, flow_str = parse_flow_dict(key) >>> + # For each attribute that we're watching. >>> + if args.filter_flows is not None: >>> + if not compare_flow_to_target(args.filter_flows, flow_dict): >>> + find_and_delete_from_watchlist(event) >>> + return >>> + >>> + print("{:<18.9f} {:<45} {:<13}".format(event.ts / 1000000000, >>> + format_ufid(event.ufid), "Insert (put) flow to kernel.")) >> >> Maybe change this to “Insert (put) flow to kernel module.” to valid missing >> tc flow put? > > Ack. > >>> + >>> + if args.flow_keys: >>> + if len(flow_str) > 80:< >>> + flow_str = " " + "),\n ".join(flow_str.split("), “))<< >>> + else: >>> + flow_str = " " + flow_str >>> + print(" - It holds the following flow information:") >> >> This is confusing as, it’s not the flow information, i.e. flow installed, >> but the keys from the packet. > > Agreed. > >>> + print(flow_str) >>> + >>> + >>> +# >>> +# compare_flow_to_target() >>> +# >>> +def compare_flow_to_target(target, flow): >>> + for key in target: >>> + if key not in flow: >>> + return False >>> + elif target[key] is True: >>> + continue >>> + elif target[key] == flow[key]: >>> + continue >>> + elif isinstance(target[key], dict) and isinstance(flow[key], dict): >>> + return compare_flow_to_target(target[key], flow[key]) >>> + else: >>> + return False >>> + return True >>> + >>> + >>> +# >>> +# parse_flow_str() >>> +# >>> +def parse_flow_str(flow_str): >>> + f_list = [i.strip(", ") for i in flow_str.split(")")] >>> + if f_list[-1] == "": >>> + f_list = f_list[:-1] >>> + flow_dict = {} >>> + for e in f_list: >>> + split_list = e.split("(") >>> + k = split_list[0] >>> + if len(split_list) == 1: >>> + flow_dict[k] = True >>> + elif split_list[1].count("=") == 0: >>> + flow_dict[k] = split_list[1] >>> + else: >>> + sub_dict = {} >>> + sublist = [i.strip() for i in split_list[1].split(",")] >>> + for subkey in sublist: >>> + brk = subkey.find("=") >>> + sub_dict[subkey[:brk]] = subkey[brk + 1:] >>> + flow_dict[k] = sub_dict >>> + return flow_dict >>> + >>> + >>> +# >>> +# print_expiration() >>> +# >>> +def print_expiration(event): >>> + reasons = ["Unknown flow expiration reason!", "Flow timed out", >>> + "Flow revalidation too expensive", >>> + "Flow needs narrower wildcard mask", >>> + "Bad ODP flow fit", "Flow with associated ofproto", >>> + "Flow translation error", "Flow cache avoidance", >>> + "Kill them all signal"] >> >> Should we maybe define this with something like this: >> >> Event = IntEnum("flow_del_reason", ["FDR_FLOW_LIVE", >> "FDR_FLOW_TIME_OUT", >> ...], start=0) >> >> If we do this, we can also use flow_del_reason.FDR_FLOW_LIVE below. > > I wrote a bit below, but I was wondering if there's really a better way > to do this like extracting the details from the code itself. But for > now, I can hard code something in there like is done in the other > revalidator script. Dont think we had scripts relying on OVS enums before. Not sure if pahole can extract this also. >>> + ufid_str = format_ufid(event.ufid) >>> + reason = event.reason >>> + >>> + if reason not in range(0, len(reasons) - 1): >>> + reason = 0 >>> + print("{:<18.9f} {:<45} {:<17}". >>> + format(event.ts / 1000000000, ufid_str, reasons[reason])) >>> + >>> + >>> +# >>> +# decode_key() >>> +# >>> +def decode_key(msg): >>> + bytes_left = len(msg) >>> + result = {} >>> + while bytes_left: >>> + if bytes_left < 4: >>> + break >>> + nla_len, nla_type = struct.unpack("=HH", msg[:4]) >>> + if nla_len < 4: >>> + break >>> + nla_data = msg[4:nla_len] >>> + trunc = False >>> + if nla_len > bytes_left: >>> + trunc = True >>> + nla_data = nla_data[:(bytes_left - 4)] >> >> Can we just not break out of this right away without doing the >> two above lines? > > I'll double check - I think I can rewrite this section a bit. > >>> + else: >>> + result[get_ovs_key_attr_str(nla_type)] = nla_data >>> + if trunc: >>> + break >>> + next_offset = (nla_len + 3) & (~3) >>> + msg = msg[next_offset:] >>> + bytes_left -= next_offset >> >> if bytes_left: >> “Can we report that our buffer was truncated?!” >> >> Not sure how to do this, but with 64 bytes being the default the -k option >> only showed in_port() which took me a while to figure out. Maybe 128 would >> be better when -k is configured? > > Good idea. Actually, I don't know if 64 bytes would ever really make > sense anyway because it doesn't allow much to include. Agreed, I think 128 sounds like a good middle ground, however, it will not decode ARP messages (have not tried ipv6). Maybe 64 is good enough if -k/-f is not supplied (I guess we can even set it to 0 without -k or -f). >>> + return result >>> + >>> + >>> +# >>> +# get_ovs_key_attr_str() >>> +# >>> +def get_ovs_key_attr_str(attr): >>> + ovs_key_attr = ["OVS_KEY_ATTR_UNSPEC", >>> + "encap", >>> + "skb_priority", >>> + "in_port", >>> + "eth", >>> + "vlan", >>> + "eth_type", >>> + "ipv4", >>> + "ipv6", >>> + "tcp", >>> + "udp", >>> + "icmp", >>> + "icmpv6", >>> + "arp", >>> + "nd", >>> + "skb_mark", >>> + "tunnel", >>> + "sctp", >>> + "tcp_flags", >>> + "dp_hash", >>> + "recirc_id", >>> + "mpls", >>> + "ct_state", >>> + "ct_zone", >>> + "ct_mark", >>> + "ct_label", >>> + "ct_tuple4", >>> + "ct_tuple6", >>> + "nsh"] >>> + >>> + if attr < 0 or attr > len(ovs_key_attr): >>> + return "<UNKNOWN>: {}".format(attr) >>> + return ovs_key_attr[attr] >>> + >>> + >>> +# >>> +# is_nonzero() >>> +# >>> +def is_nonzero(val): >>> + if isinstance(val, int): >>> + return (val != 0) >>> + >>> + if isinstance(val, str): >>> + val = bytes(val, "utf-8") >>> + >>> + # If it's not a string or an int, it's bytes. >>> + return (val.count(0) < len(val)) >>> + >>> + >>> +# >>> +# parse_flow_dict() >>> +# >>> +def parse_flow_dict(key_dict, decode=True): >>> + ret_str = "" >>> + parseable = {} >>> + skip = ["nsh", "tunnel", "mpls", "vlan"] >>> + need_byte_swap = ["ct_label"] >>> + ipv4addrs = ["ct_tuple4", "tunnel", "ipv4", "arp"] >>> + ipv6addrs = ["ipv6", "nd", "ct_tuple6"] >>> + macs = {"eth": [0, 1], "arp": [3, 4], "nd": [1, 2]} >>> + fields = [("OVS_KEY_ATTR_UNSPEC"), >>> + ("encap", ), >>> + ("skb_priority", "<I"), >>> + ("in_port", "<I"), >>> + ("eth", "!6s6s", "src", "dst"), >>> + ("vlan", ), >>> + ("eth_type", "!H"), >>> + ("ipv4", "!4s4s4B", "src", "dst", "proto", "tos", "ttl", >>> "frag"), >>> + ("ipv6", "!16s16s4s4B", "src", "dst", >>> + "label", "proto", "tclass", "hlimit", "frag"), >>> + ("tcp", "!2H", "src", "dst"), >>> + ("udp", "!2H", "src", "dst"), >>> + ("icmp", "!2B", "type", "code"), >>> + ("icmpv6", "!2B", "type", "code"), >>> + ("arp", "!4s4sH6s6s", "sip", "tip", "op", "sha", "tha"), >>> + ("nd", "!16s6s6s", "target", "sll", "tll"), >>> + ("skb_mark", "<I"), >>> + ("tunnel", ), >>> + ("sctp", "!2H", "src", "dst"), >>> + ("tcp_flags", "!H"), >>> + ("dp_hash", "<I"), >>> + ("recirc_id", "<I"), >>> + ("mpls", ), >>> + ("ct_state", "<I"), >>> + ("ct_zone", "<H"), >>> + ("ct_mark", "<I"), >>> + ("ct_label", "!16s"), >>> + ("ct_tuple4", >>> + "!4s4s2HB", "src", "dst", "tp_src", "tp_dst", "proto"), >>> + ("ct_tuple6", >>> + "!16s16sB2H", "src", "dst", "proto", "tp_src", "tp_dst"), >>> + ("nsh", )] >>> + for k, v in key_dict.items(): >>> + s = "" >>> + if k in skip: >>> + continue >>> + if decode and int.from_bytes(v, "big") == 0: >>> + parseable[k] = "0" >>> + continue >>> + if decode and k in need_byte_swap: >>> + v = int.from_bytes(v, "little").to_bytes(len(v), "big") >>> + attr = -1 >>> + found = False >>> + for f in fields: >>> + if k == f[0]: >>> + attr = fields.index(f) >>> + found = True >>> + break >>> + if not found: >>> + raise KeyError("Invalid flow field '%s'" % k) >>> + if decode and len(fields[attr]) > 1: >>> + data = list(struct.unpack(fields[attr][1], >>> + v[:struct.calcsize(fields[attr][1])])) >>> + if k in ipv4addrs: >>> + if data[0].count(0) < 4: >>> + data[0] = str(IPv4Address(data[0])) >>> + else: >>> + data[0] = b"\x00" >>> + if data[1].count(0) < 4: >>> + data[1] = str(IPv4Address(data[1])) >>> + else: >>> + data[1] = b"\x00" >>> + if k in ipv6addrs: >>> + if data[0].count(0) < 16: >>> + data[0] = str(IPv6Address(data[0])) >>> + else: >>> + data[0] = b"\x00" >>> + if data[1].count(0) < len(data[1]): >>> + data[1] = str(IPv6Address(data[1])) >>> + else: >>> + data[1] = b"\x00" >>> + if k in macs.keys(): >>> + for e in macs[k]: >>> + if data[e].count(0) == 6: >>> + mac_str = b"\x00" >>> + else: >>> + mac_str = ":".join(["%02x" % i for i in data[e]]) >>> + data[e] = mac_str >>> + if decode and len(fields[attr]) > 2: >>> + field_dict = {field: d for field, d in zip(fields[attr][2:], >>> data)} >>> + s = ", ".join(k + "=" + str(v) for k, v in field_dict.items()) >>> + elif decode and k != "eth_type": >>> + s = str(data[0]) >>> + field_dict = s >>> + else: >>> + if decode: >>> + s = hex(data[0]) >>> + field_dict = s >>> + ret_str += k + "(" + s + "), " >>> + parseable[k] = field_dict >>> + ret_str = ret_str[:-2] >>> + return (parseable, ret_str) >>> + >>> + >>> +# >>> +# handle_event() >>> +# >>> +def handle_event(ctx, data, size): >>> + # Once we grab the event, we have three cases. >>> + # 1. It's a revalidator probe and the reason is nonzero: A flow is >>> expiring >>> + # 2. It's a revalidator probe and the reason is zero: flow revalidated >>> + # 3. It's a flow_put probe. >>> + # >>> + # We will ignore case 2, and report all others. >>> + # >>> + event = b["events"].event(data) >>> + if event.probe == 0: # OP_FLOW_PUT >> >> Here we should also define an enum for the probe events, see ‘Event = >> IntEnum("Event”...’ and ‘<EVENT_ENUM>’ in reval_monitor.py >> >>> + handle_flow_put(event)< >>> + elif event.probe == 1 and event.reason > 0: # FLOW_RESULT >> >> Here we could do “event.reason > flow_del_reason.FDR_FLOW_LIVE”, see comment >> above. > > I can do the above, but I also am wondering if it's possible to have > something we can use to fill up the enum dynamically without needing to > duplicate things on the python side. That would be nice, maybe pahole already supports this. >>> + print_expiration(event) >>> + >>> + >>> +def main(): >>> + # >>> + # Don't like these globals, but ctx passing does not work with the >>> existing >>> + # open_ring_buffer() API :( >>> + # >>> + global b >>> + global args >>> + >>> + # >>> + # Argument parsing >>> + # >>> + parser = argparse.ArgumentParser() >>> + parser.add_argument("--buffer-page-count", >>> + help="Number of BPF ring buffer pages, default >>> 1024", >>> + type=int, default=1024, metavar="NUMBER") >>> + parser.add_argument("-f", "--flow-key-size", >>> + help="Set maximum flow key size to capture, " >>> + "default 64 - see notes", type=buffer_size_type, >>> + default=64, metavar="[64-2048]") >>> + parser.add_argument("-k", "--flow-keys", >>> + help="Print flow keys as flow strings", >>> + action="store_true") >>> + parser.add_argument("-l", "--filter-flows", metavar="FLOW_STRING", >>> + help="Filter flows that match the specified " >>> + "ODP-like flow", >>> + type=str, default=None, nargs="*") >>> + parser.add_argument("-p", "--pid", metavar="VSWITCHD_PID", >>> + help="ovs-vswitchd's PID", type=int, default=None) >>> + parser.add_argument("-D", "--debug", help="Enable eBPF debugging", >>> + type=int, const=0x3f, default=0, nargs="?") >>> + args = parser.parse_args() >>> + >>> + # >>> + # Find the PID of the ovs-vswitchd daemon if not specified. >>> + # >>> + if args.pid is None: >>> + for proc in psutil.process_iter(): >>> + if "ovs-vswitchd" in proc.name(): >>> + if args.pid is not None: >>> + print("Error: Multiple ovs-vswitchd daemons running, " >>> + "use the -p option!") >>> + sys.exit(-1) >>> + >>> + args.pid = proc.pid >>> + # >>> + # Error checking on input parameters >>> + # >>> + if args.pid is None: >>> + print("ERROR: Failed to find ovs-vswitchd's PID!") >>> + sys.exit(-1) >>> + >>> + # >>> + # Attach the USDT probes >>> + # >>> + u = USDT(pid=int(args.pid)) >>> + try: >>> + u.enable_probe(probe="op_flow_put", fn_name="usdt__op_flow_put") >>> + except USDTException as e: >>> + print("Error attaching the dpif_netlink_operate__:op_flow_put >>> probe.") >>> + print(str(e)) >>> + sys.exit(-1) >>> + >>> + try: >>> + u.enable_probe(probe="flow_result", fn_name="usdt__flow_result") >>> + except USDTException as e: >>> + print("Error attaching the revalidate:flow_result probe.") >>> + print(str(e)) >>> + sys.exit(-1) >>> + >>> + # >>> + # Attach the probes to the running process >>> + # >>> + source = bpf_src.replace("<BUFFER_PAGE_COUNT>", >>> + str(args.buffer_page_count)) >>> + >>> + if args.filter_flows is None: >>> + filter_bool = 0 >>> + >>> + # Set the key size based on what the user wanted >>> + source = source.replace("<MAX_KEY_VAL>", str(args.flow_key_size)) >>> + else: >>> + filter_bool = 1 >>> + args.filter_flows = parse_flow_str(args.filter_flows[0]) >>> + >>> + # Run through the parser to make sure we only filter on fields we >>> + # understand >>> + parse_flow_dict(args.filter_flows, False) >>> + >>> + # This is hardcoded here because it doesn't make sense to shrink >>> the >>> + # size, since the flow key might be missing fields that are >>> matched in >>> + # the flow filter. >>> + source = source.replace("<MAX_KEY_VAL>", "2048") >>> + >>> + source = source.replace("<FILTER_BOOL>", str(filter_bool)) >>> + >>> + b = BPF(text=source, usdt_contexts=[u], debug=args.debug) >>> + >>> + # >>> + # Print header >>> + # >>> + print("{:<18} {:<45} {:<17}".format("TIME", "UFID", "EVENT/REASON")) >>> + >>> + # >>> + # Dump out all events. >>> + # >>> + b["events"].open_ring_buffer(handle_event) >>> + while 1: >>> + try: >>> + b.ring_buffer_poll() >>> + time.sleep(0.5) >> >> I think we can remove this sleep. > > I'll try without it. IIRC, the ring buffer polling was very aggressive > on the CPU, but that is just a memory from mid-2022. I got the ‘remove’ comment from Adrian also a while back and did some tests and I did not see any load increase on the Python application. But it might be worth it for you to do the same, you never know where I screwed up ;) >>> + except KeyboardInterrupt: >>> + break >>> + >>> + >>> +# >>> +# Start main() as the default entry point >>> +#< >>> +if __name__ == "__main__": >>> + main() >>> -- >>> 2.41.0 >> >> Missing my previous comment on adding a check to make sure we do not lose >> events: >> >> “ >> Forgot to mention that you probably also want to add some checking to make >> sure you do not lose events. >> >> See __sync_fetch_and_add() below: >> >> +BPF_TABLE("percpu_array", uint32_t, uint64_t, dropcnt, 1); >> + >> +static struct event_t *get_event(uint32_t id) { >> + struct event_t *event = events.ringbuf_reserve(sizeof(struct event_t)); >> + >> + if (!event) { >> + uint32_t type = 0; >> + uint64_t *value = dropcnt.lookup(&type); >> + if (value) >> + __sync_fetch_and_add(value, 1); >> + >> + return NULL; >> + } >> + >> + event->id = id; >> + event->ts = bpf_ktime_get_ns(); >> + event->pid = bpf_get_current_pid_tgid(); >> + >> + return event; >> +} >> “ >> >> The other missing part is to include the PID/TID in the output so we can >> relate to which revalidator thread did this (or add the comm with the name). > > Okay. > >> And finally, the part that got this patch delayed, not adding static OVS >> structure definitions. Which is still the case in this version. For now, you >> should probably copy the get_ovs_definitions() implementation from >> reval_monitor.py. > > Will do. _______________________________________________ dev mailing list [email protected] https://mail.openvswitch.org/mailman/listinfo/ovs-dev
