Add a Python script that loads a test module, triggers its debugfs
entry with kcov_dataflow recording active, then pretty-prints captured
records as a nested call tree with kallsyms symbol resolution.

Features:
- 8MB ring buffer (1M u64 words) for INSTRUMENT_ALL kernels
- Enable recording after module load, before trigger (avoids VFS noise)
- Variable-length record parsing using header-encoded field count
- Module-only filtering via kallsyms symbol lookup
- --context/-C N: show N records before/after each module function call
- --raw: print raw records instead of call tree
- Architecture-aware syscall numbers (x86_64 and arm64)

Usage:

  python3 trigger-view.py eight_args_c \
    --ko eight_args_c/eight_args_c.ko

  python3 trigger-view.py eight_args_rust \
    --ko eight_args_rust/eight_args_rust.ko

  python3 trigger-view.py rust_ffi_contract \
    --ko rust_ffi_contract/rust_ffi_contract.ko

Cc: Alexander Potapenko <[email protected]>
Assisted-by: Claude:claude-opus-4-6 [kiro-chat]
Link: https://github.com/yskzalloc/kcov-dataflow/actions
Signed-off-by: Yunseong Kim <[email protected]>
---
 .../selftests/kcov_dataflow/trigger-view.py        | 377 +++++++++++++++++++++
 1 file changed, 377 insertions(+)

diff --git a/tools/testing/selftests/kcov_dataflow/trigger-view.py 
b/tools/testing/selftests/kcov_dataflow/trigger-view.py
new file mode 100755
index 000000000000..a3274e472dc1
--- /dev/null
+++ b/tools/testing/selftests/kcov_dataflow/trigger-view.py
@@ -0,0 +1,377 @@
+#!/usr/bin/env python3
+# SPDX-License-Identifier: GPL-2.0
+"""
+trigger-view.py - Load a module with kcov_dataflow
+recording active, then pretty-print captured records.
+
+Usage:
+    python3 trigger-view.py eight_args_c
+    python3 trigger-view.py rust_ffi_contract
+    python3 trigger-view.py eight_args_c --raw
+
+The script:
+  1. Opens /sys/kernel/debug/kcov_dataflow
+  2. Inits and mmaps the buffer
+  3. Enables recording for this process
+  4. Loads the module via finit_module() -- init runs in our context
+  5. Disables recording
+  6. Unloads the module
+  7. Parses and prints captured records with kallsyms resolution
+"""
+import os
+import sys
+import struct
+import ctypes
+import ctypes.util
+import argparse
+import fcntl
+
+# Constants
+DF_TYPE_ENTRY = 0xE
+DF_TYPE_RET = 0xF
+MAGIC_BAD = 0xBADADD85
+BUF_SIZE = 1048576  # 1M words = 8MB
+
+# Ioctl numbers
+def _IOR(t, nr, size):
+    return (2 << 30) | (ord(t) << 8) | nr | (size << 16)
+
+def _IO(t, nr):
+    return (ord(t) << 8) | nr
+
+KCOV_DF_INIT_TRACK = _IOR('d', 1, 8)
+KCOV_DF_ENABLE = _IO('d', 100)
+KCOV_DF_DISABLE = _IO('d', 101)
+
+# syscall numbers (x86_64)
+import platform
+_machine = platform.machine()
+if _machine == "aarch64":
+    SYS_FINIT_MODULE = 273
+    SYS_DELETE_MODULE = 106
+else:  # x86_64
+    SYS_FINIT_MODULE = 313
+    SYS_DELETE_MODULE = 176
+
+SELFTEST_DIR = os.path.dirname(os.path.abspath(__file__))
+
+
+def load_kallsyms():
+    """Load kernel symbols for PC resolution."""
+    syms = []
+    try:
+        with open("/proc/kallsyms") as f:
+            for line in f:
+                parts = line.split()
+                if len(parts) >= 3:
+                    addr = int(parts[0], 16)
+                    name = parts[2]
+                    mod = parts[3].strip("[]") if len(parts) > 3 else ""
+                    syms.append((addr, name, mod))
+    except (PermissionError, FileNotFoundError):
+        pass
+    syms.sort()
+    return syms
+
+
+def symbolize(pc, syms):
+    """Find nearest symbol <= pc."""
+    if not syms:
+        return f"0x{pc:x}"
+    lo, hi = 0, len(syms) - 1
+    while lo < hi:
+        mid = (lo + hi + 1) // 2
+        if syms[mid][0] <= pc:
+            lo = mid
+        else:
+            hi = mid - 1
+    addr, name, mod = syms[lo]
+    if addr > pc:
+        return f"0x{pc:x}"
+    offset = pc - addr
+    if mod:
+        return f"{name}+0x{offset:x} [{mod}]" if offset else f"{name} [{mod}]"
+    return f"{name}+0x{offset:x}" if offset else name
+
+
+def format_val(v):
+    """Format a captured value."""
+    if v == MAGIC_BAD:
+        return "FAULT"
+    if v == 0:
+        return "0x0"
+    return f"0x{v:x}"
+
+
+def find_module(name):
+    """Find the .ko file for the given test name."""
+    ko_path = os.path.join(SELFTEST_DIR, name, f"{name}_mod.ko")
+    if os.path.exists(ko_path):
+        return ko_path
+    # Try without _mod suffix
+    ko_path = os.path.join(SELFTEST_DIR, name, f"{name}.ko")
+    if os.path.exists(ko_path):
+        return ko_path
+    # Search for any .ko in the directory
+    mod_dir = os.path.join(SELFTEST_DIR, name)
+    if os.path.isdir(mod_dir):
+        for f in os.listdir(mod_dir):
+            if f.endswith(".ko"):
+                return os.path.join(mod_dir, f)
+    return None
+
+
+def finit_module(ko_path):
+    """Load a kernel module via finit_module syscall."""
+    libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+    fd = os.open(ko_path, os.O_RDONLY)
+    ret = libc.syscall(SYS_FINIT_MODULE, fd, b"", 0)
+    os.close(fd)
+    if ret != 0:
+        errno = ctypes.get_errno()
+        raise OSError(errno, f"finit_module({ko_path}): {os.strerror(errno)}")
+
+
+def delete_module(name):
+    """Unload a kernel module."""
+    libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+    ret = libc.syscall(SYS_DELETE_MODULE, name.encode(), 0)
+    if ret != 0:
+        errno = ctypes.get_errno()
+        raise OSError(errno, f"delete_module({name}): {os.strerror(errno)}")
+
+
+def parse_records(buf, total_words):
+    """Parse the ring buffer into a list of records."""
+    records = []
+    pos = 1
+    while pos + 3 <= total_words and pos < BUF_SIZE:
+        hdr = buf[pos]
+
+        # Valid headers fit in 32 bits (upper 32 must be zero)
+        if hdr >> 32:
+            pos += 1
+            continue
+
+        rtype = (hdr >> 28) & 0xF
+
+        if rtype not in (DF_TYPE_ENTRY, DF_TYPE_RET):
+            pos += 1
+            continue
+
+        pc = buf[pos + 1]
+        meta = buf[pos + 2]
+        seq = hdr & 0x00FFFFFF
+        num_vals = (hdr >> 24) & 0xF
+        if num_vals == 0:
+            num_vals = 1
+
+        # Valid records always have a non-zero PC (kernel text address)
+        if pc == 0:
+            pos += 1
+            continue
+
+        val = buf[pos + 3] if pos + 3 < BUF_SIZE else 0
+        records.append({
+            "type": rtype,
+            "seq": seq,
+            "pc": pc,
+            "meta": meta,
+            "val": val,
+        })
+        pos += 3 + num_vals
+    return records
+
+
+def print_raw(records, syms):
+    """Print records in raw format."""
+    for r in records:
+        sym = symbolize(r["pc"], syms)
+        t = "ENTRY" if r["type"] == DF_TYPE_ENTRY else "RET  "
+        arg_idx = (r["meta"] >> 56) & 0xFF
+        size = (r["meta"] >> 48) & 0xFF
+        print(f"[{t}] seq={r['seq']:3d} {sym} "
+              f"arg[{arg_idx}]({size}) = {format_val(r['val'])}")
+
+
+def print_tree(records, syms):
+    """Print records as indented call tree matching converted.txt format."""
+    depth = 0
+    # Group consecutive ENTRY records by PC to collect all args
+    i = 0
+    while i < len(records):
+        r = records[i]
+        sym = symbolize(r["pc"], syms)
+
+        if r["type"] == DF_TYPE_ENTRY:
+            # Collect all args for this call (same PC, consecutive entries)
+            args = []
+            pc = r["pc"]
+            while i < len(records) and records[i]["type"] == DF_TYPE_ENTRY \
+                    and records[i]["pc"] == pc:
+                args.append(format_val(records[i]["val"]))
+                i += 1
+            indent = "  " * depth
+            print(f"{indent}{sym}({', '.join(args)})")
+            depth += 1
+        else:
+            depth = max(0, depth - 1)
+            indent = "  " * depth
+            print(f"{indent}{format_val(r['val'])} = {sym}()")
+            i += 1
+
+
+def main():
+    parser = argparse.ArgumentParser(
+        description="Load a test module with kcov_dataflow and view records")
+    parser.add_argument("module", help="Test module name (e.g. eight_args_c)")
+    parser.add_argument("--raw", action="store_true",
+                        help="Print raw records instead of tree")
+    parser.add_argument("--ko", help="Explicit path to .ko file")
+    parser.add_argument("--context", "-C", type=int, default=0,
+                        help="Show N lines before/after each module record")
+    args = parser.parse_args()
+
+    # Find module
+    if args.ko:
+        ko_path = args.ko
+    else:
+        ko_path = find_module(args.module)
+    if not ko_path or not os.path.exists(ko_path):
+        print(f"Cannot find module for '{args.module}'", file=sys.stderr)
+        print(f"Build it first: make LLVM=1 CC=clang "
+              f"M=tools/testing/selftests/kcov_dataflow/{args.module} modules",
+              file=sys.stderr)
+        sys.exit(1)
+
+    # Open kcov_dataflow
+    # Ensure kallsyms shows real addresses
+    try:
+        with open("/proc/sys/kernel/kptr_restrict", "w") as f:
+            f.write("0")
+    except (PermissionError, FileNotFoundError):
+        pass
+
+    try:
+        df_fd = os.open("/sys/kernel/debug/kcov_dataflow", os.O_RDWR)
+    except OSError as e:
+        print(f"Cannot open kcov_dataflow: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    # Init + mmap
+    fcntl.ioctl(df_fd, KCOV_DF_INIT_TRACK, BUF_SIZE)
+    libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
+    libc.mmap.restype = ctypes.c_void_p
+    libc.mmap.argtypes = [
+        ctypes.c_void_p, ctypes.c_size_t, ctypes.c_int,
+        ctypes.c_int, ctypes.c_int, ctypes.c_long
+    ]
+    buf_ptr = libc.mmap(None, BUF_SIZE * 8, 0x3, 0x01, df_fd, 0)
+    if buf_ptr == ctypes.c_void_p(-1).value:
+        print("mmap failed", file=sys.stderr)
+        sys.exit(1)
+    buf = (ctypes.c_uint64 * BUF_SIZE).from_address(buf_ptr)
+
+    # Load module first (generates noise with INSTRUMENT_ALL)
+    mod_name = os.path.basename(ko_path).replace(".ko", "")
+    try:
+        finit_module(ko_path)
+        print(f"# Loaded {mod_name}")
+    except OSError as e:
+        print(f"Failed to load module: {e}", file=sys.stderr)
+        sys.exit(1)
+
+    # Get module .text address for PC filtering
+    mod_text_start = 0
+    try:
+        with open(f"/sys/module/{mod_name}/sections/.text") as f:
+            mod_text_start = int(f.read().strip(), 16)
+    except (FileNotFoundError, ValueError, PermissionError):
+        pass
+
+    # Enable recording AFTER load, BEFORE trigger (avoids VFS/loader noise)
+    fcntl.ioctl(df_fd, KCOV_DF_ENABLE, 0)
+    buf[0] = 0
+
+    # Trigger the module's debugfs file to invoke test functions
+    trigger_paths = [
+        f"/sys/kernel/debug/kcov_dataflow_test/trigger",
+        f"/sys/kernel/debug/kcov_dataflow_test/rust_ffi_trigger",
+        f"/sys/kernel/debug/trigger_rust",
+        f"/sys/kernel/debug/{mod_name}/trigger",
+    ]
+    for tp in trigger_paths:
+        try:
+            with open(tp, "w") as f:
+                f.write("1")
+            break
+        except (FileNotFoundError, PermissionError):
+            continue
+
+    fcntl.ioctl(df_fd, KCOV_DF_DISABLE, 0)
+
+    # Read kallsyms while module is still loaded (symbols available)
+    syms = load_kallsyms()
+
+    # Unload
+    try:
+        delete_module(mod_name)
+    except OSError:
+        pass
+
+    # Parse and display
+    total = int(buf[0])
+    print(f"# Captured {total} words")
+    records = parse_records(buf, total)
+    print(f"# {len(records)} records")
+
+    # Filter to module records using kallsyms
+    # Build set of module symbol addresses for fast lookup
+    mod_syms = set()
+    for addr, name, mod in syms:
+        if mod == mod_name and addr != 0:
+            mod_syms.add(addr)
+
+    def is_module_pc(pc):
+        """Check if PC belongs to mod_name via kallsyms."""
+        if mod_syms:
+            # Binary search: find nearest symbol <= pc, check module
+            lo, hi = 0, len(syms) - 1
+            while lo < hi:
+                mid = (lo + hi + 1) // 2
+                if syms[mid][0] <= pc:
+                    lo = mid
+                else:
+                    hi = mid - 1
+            return syms[lo][2] == mod_name
+        # Fallback: if no module symbols (kptr_restrict), use .text start
+        return mod_text_start and pc >= mod_text_start
+
+    if syms or mod_text_start:
+        if args.context > 0:
+            module_indices = set()
+            for i, r in enumerate(records):
+                if is_module_pc(r["pc"]):
+                    for j in range(max(0, i - args.context),
+                                   min(len(records), i + args.context + 1)):
+                        module_indices.add(j)
+            records = [records[i] for i in sorted(module_indices)]
+            print(f"# showing {len(records)} records with 
context={args.context} "
+                  f"around {mod_name}\n")
+        else:
+            module_records = [r for r in records if is_module_pc(r["pc"])]
+            print(f"# {len(module_records)} from {mod_name}\n")
+            records = module_records
+    else:
+        print("")
+
+    if args.raw:
+        print_raw(records, syms)
+    else:
+        print_tree(records, syms)
+
+    os.close(df_fd)
+
+
+if __name__ == "__main__":
+    main()

-- 
2.43.0


Reply via email to