Add process UI tab entry to show flows statistics per pid.

Also changed flow_entry which now has pointer to new struct proc_entry
object which contains process related info.

On each 1 second refresh proc_entry is checked if it exists by checking
/proc/<pid> path, and is deleted if there is no any flows related to it
(flows_count is 0), if the process exists then dst & src rates info is
zeroed and summed from the all related flows which are in the
proc_entry->flows list.

The bytes & pkts amount info is collected during all the time process
exists.

Signed-off-by: Vadim Kochan <vadi...@gmail.com>
---
 flowtop.c | 349 +++++++++++++++++++++++++++++++++++++++++++++++++++++---------
 proc.c    |  11 ++
 proc.h    |   3 +-
 3 files changed, 315 insertions(+), 48 deletions(-)

diff --git a/flowtop.c b/flowtop.c
index 78ac253..2491de7 100644
--- a/flowtop.c
+++ b/flowtop.c
@@ -52,7 +52,25 @@
 #define USEC_PER_SEC 1000000L
 #endif
 
+struct proc_entry {
+       struct cds_list_head entry;
+       struct cds_list_head flows;
+       struct rcu_head rcu;
+
+       struct timeval last_update;
+       unsigned int pid;
+       char name[256];
+       uint64_t pkts_src, bytes_src;
+       uint64_t pkts_dst, bytes_dst;
+       double rate_bytes_src;
+       double rate_bytes_dst;
+       double rate_pkts_src;
+       double rate_pkts_dst;
+       int flows_count;
+};
+
 struct flow_entry {
+       struct cds_list_head proc_head;
        struct cds_list_head entry;
        struct rcu_head rcu;
 
@@ -69,9 +87,8 @@ struct flow_entry {
        char country_code_src[4], country_code_dst[4];
        char city_src[128], city_dst[128];
        char rev_dns_src[256], rev_dns_dst[256];
-       char procname[256];
+       struct proc_entry *proc;
        int inode;
-       unsigned int procnum;
        bool is_visible;
        struct nf_conntrack *ct;
        struct timeval last_update;
@@ -85,6 +102,10 @@ struct flow_list {
        struct cds_list_head head;
 };
 
+struct proc_list {
+       struct cds_list_head head;
+};
+
 enum flow_direction {
        FLOW_DIR_SRC,
        FLOW_DIR_DST,
@@ -129,6 +150,7 @@ static volatile bool do_reload_flows;
 static volatile bool is_flow_collecting;
 static volatile sig_atomic_t sigint = 0;
 static int what = INCLUDE_IPV4 | INCLUDE_IPV6 | INCLUDE_TCP;
+static struct proc_list proc_list;
 static struct flow_list flow_list;
 static struct sysctl_params_ctx sysctl = { -1, -1 };
 
@@ -156,10 +178,22 @@ enum tbl_flow_col {
        TBL_FLOW_RATE,
 };
 
+enum tbl_proc_col {
+       TBL_PROC_NAME,
+       TBL_PROC_PID,
+       TBL_PROC_FLOWS,
+       TBL_PROC_BYTES_SRC,
+       TBL_PROC_RATE_SRC,
+       TBL_PROC_BYTES_DST,
+       TBL_PROC_RATE_DST,
+};
+
 static struct ui_table flows_tbl;
+static struct ui_table procs_tbl;
 
 enum tab_entry {
        TAB_FLOWS,
+       TAB_PROCS,
 };
 
 static const char *short_options = "vhTUsDIS46ut:nGb";
@@ -432,6 +466,11 @@ static int flow_list_del_entry(struct flow_list *fl, const 
struct nf_conntrack *
 
        n = flow_list_find_id(fl, nfct_get_attr_u32(ct, ATTR_ID));
        if (n) {
+               if (n->proc) {
+                       cds_list_del(&n->proc_head);
+                       n->proc->flows_count--;
+               }
+
                cds_list_del_rcu(&n->entry);
                call_rcu(&n->rcu, flow_entry_xfree_rcu);
        }
@@ -449,22 +488,72 @@ static void flow_list_destroy(struct flow_list *fl)
        }
 }
 
+static void proc_list_init(struct proc_list *proc_list)
+{
+       CDS_INIT_LIST_HEAD(&proc_list->head);
+}
+
+static struct proc_entry *proc_list_new_entry(unsigned int pid)
+{
+       struct proc_entry *proc;
+
+       cds_list_for_each_entry(proc, &proc_list.head, entry) {
+               if (proc->pid && proc->pid == pid)
+                       return proc;
+       }
+
+       proc = xzmalloc(sizeof(*proc));
+
+       bug_on(gettimeofday(&proc->last_update, NULL));
+       CDS_INIT_LIST_HEAD(&proc->flows);
+       proc->pid = pid;
+
+       cds_list_add_tail(&proc->entry, &proc_list.head);
+
+       return proc;
+}
+
+static void proc_entry_xfree_rcu(struct rcu_head *head)
+{
+       struct proc_entry *p = container_of(head, struct proc_entry, rcu);
+
+       xfree(p);
+}
+
+static void proc_list_destroy(struct proc_list *pl)
+{
+       struct proc_entry *p, *tmp;
+
+       cds_list_for_each_entry_safe(p, tmp, &pl->head, entry) {
+               cds_list_del_rcu(&p->entry);
+               call_rcu(&p->rcu, proc_entry_xfree_rcu);
+       }
+}
+
 static void flow_entry_find_process(struct flow_entry *n)
 {
+       struct proc_entry *p;
        char cmdline[512];
        pid_t pid;
        int ret;
 
        ret = proc_find_by_inode(n->inode, cmdline, sizeof(cmdline), &pid);
-       if (ret <= 0) {
-               n->procname[0] = '\0';
+       if (ret <= 0)
                return;
-       }
 
-       if (snprintf(n->procname, sizeof(n->procname), "%s", basename(cmdline)) 
< 0)
-               n->procname[0] = '\0';
+       p = proc_list_new_entry(pid);
+
+       if (snprintf(p->name, sizeof(p->name), "%s", basename(cmdline)) < 0)
+               p->name[0] = '\0';
 
-       n->procnum = pid;
+       p->pkts_src += n->pkts_src;
+       p->pkts_dst += n->pkts_dst;
+       p->bytes_src += n->bytes_src;
+       p->bytes_dst += n->bytes_dst;
+       p->flows_count++;
+
+       cds_list_add(&n->proc_head, &p->flows);
+       n->proc = p;
 }
 
 static int get_port_inode(uint16_t port, int proto, bool is_ip6)
@@ -513,6 +602,19 @@ static int get_port_inode(uint16_t port, int proto, bool 
is_ip6)
 
 static void flow_entry_from_ct(struct flow_entry *n, const struct nf_conntrack 
*ct)
 {
+       uint64_t bytes_src = nfct_get_attr_u64(ct, ATTR_ORIG_COUNTER_BYTES);
+       uint64_t bytes_dst = nfct_get_attr_u64(ct, ATTR_REPL_COUNTER_BYTES);
+       uint64_t pkts_src  = nfct_get_attr_u64(ct, ATTR_ORIG_COUNTER_PACKETS);
+       uint64_t pkts_dst  = nfct_get_attr_u64(ct, ATTR_REPL_COUNTER_PACKETS);
+
+       /* Update stats diff to the related process entry */
+       if (n->proc) {
+               n->proc->pkts_src += pkts_src - n->pkts_src;
+               n->proc->pkts_dst += pkts_dst - n->pkts_dst;
+               n->proc->bytes_src += bytes_src - n->bytes_src;
+               n->proc->bytes_dst += bytes_dst - n->bytes_dst;
+       }
+
        CP_NFCT(l3_proto, ATTR_ORIG_L3PROTO, 8);
        CP_NFCT(l4_proto, ATTR_ORIG_L4PROTO, 8);
 
@@ -916,10 +1018,11 @@ static void draw_flow_entry(const struct flow_entry *n)
        ui_table_row_add(&flows_tbl);
 
        /* Application */
-       ui_table_row_col_set(&flows_tbl, TBL_FLOW_PROCESS, n->procname);
+       ui_table_row_col_set(&flows_tbl, TBL_FLOW_PROCESS,
+                             n->proc ?  n->proc->name : "");
 
        /* PID */
-       slprintf(tmp, sizeof(tmp), "%.d", n->procnum);
+       slprintf(tmp, sizeof(tmp), "%.d", n->proc ? n->proc->pid : 0);
        ui_table_row_col_set(&flows_tbl, TBL_FLOW_PID, tmp);
 
        /* L4 protocol */
@@ -1010,6 +1113,37 @@ static inline bool presenter_flow_wrong_state(struct 
flow_entry *n)
        return true;
 }
 
+static void draw_filter_status(char *title, int count, int skip_lines)
+{
+       mvwprintw(screen, 1, 0, "%*s", COLS - 1, " ");
+       mvwprintw(screen, 1, 2, "%s(%u) for ", title, count);
+
+       if (what & INCLUDE_IPV4)
+               printw("IPv4,");
+       if (what & INCLUDE_IPV6)
+               printw("IPv6,");
+       if (what & INCLUDE_TCP)
+               printw("TCP,");
+       if (what & INCLUDE_UDP)
+               printw("UDP,");
+       if (what & INCLUDE_SCTP)
+               printw("SCTP,");
+       if (what & INCLUDE_DCCP)
+               printw("DCCP,");
+       if (what & INCLUDE_ICMP && what & INCLUDE_IPV4)
+               printw("ICMP,");
+       if (what & INCLUDE_ICMP && what & INCLUDE_IPV6)
+               printw("ICMP6,");
+       if (show_active_only)
+               printw("Active,");
+
+       printw(" [+%d]", skip_lines);
+
+       if (is_flow_collecting)
+               printw(" [Collecting flows ...]");
+
+}
+
 static void draw_flows(WINDOW *screen, struct flow_list *fl,
                       int skip_lines)
 {
@@ -1051,29 +1185,73 @@ static void draw_flows(WINDOW *screen, struct flow_list 
*fl,
        mvwprintw(screen, 1, 0, "%*s", COLS - 1, " ");
        mvwprintw(screen, 1, 2, "Kernel netfilter flows(%u) for ", flows);
 
-       if (what & INCLUDE_IPV4)
-               printw("IPv4,");
-       if (what & INCLUDE_IPV6)
-               printw("IPv6,");
-       if (what & INCLUDE_TCP)
-               printw("TCP,");
-       if (what & INCLUDE_UDP)
-               printw("UDP,");
-       if (what & INCLUDE_SCTP)
-               printw("SCTP,");
-       if (what & INCLUDE_DCCP)
-               printw("DCCP,");
-       if (what & INCLUDE_ICMP && what & INCLUDE_IPV4)
-               printw("ICMP,");
-       if (what & INCLUDE_ICMP && what & INCLUDE_IPV6)
-               printw("ICMP6,");
-       if (show_active_only)
-               printw("Active,");
+       draw_filter_status("Kernel netfilter flows", flows, skip_lines);
+}
 
-       printw(" [+%d]", skip_lines);
+static void draw_proc_entry(struct proc_entry *p)
+{
+       struct ui_table *tbl = &procs_tbl;;
+       char tmp[128];
 
-       if (is_flow_collecting)
-               printw(" [Collecting flows ...]");
+       ui_table_row_add(tbl);
+
+       /* Application */
+       ui_table_row_col_set(tbl, TBL_PROC_NAME, p->name);
+
+       /* PID */
+       slprintf(tmp, sizeof(tmp), "%.d", p->pid);
+       ui_table_row_col_set(tbl, TBL_PROC_PID, tmp);
+
+       /* Flows */
+       slprintf(tmp, sizeof(tmp), "%.d", p->flows_count);
+       ui_table_row_col_set(tbl, TBL_PROC_FLOWS, tmp);
+
+       /* Bytes Src */
+       bandw2str(p->bytes_src, tmp, sizeof(tmp) - 1);
+       ui_table_row_col_set(tbl, TBL_PROC_BYTES_SRC, tmp);
+
+       /* Rate Src */
+       rate2str(p->rate_bytes_src, tmp, sizeof(tmp) - 1);
+       ui_table_row_col_set(tbl, TBL_PROC_RATE_SRC, tmp);
+
+       /* Bytes Dest */
+       bandw2str(p->bytes_dst, tmp, sizeof(tmp) - 1);
+       ui_table_row_col_set(tbl, TBL_PROC_BYTES_DST, tmp);
+
+       /* Rate Dest */
+       rate2str(p->rate_bytes_dst, tmp, sizeof(tmp) - 1);
+       ui_table_row_col_set(tbl, TBL_PROC_RATE_DST, tmp);
+
+       ui_table_row_show(tbl);
+}
+
+static void draw_procs(WINDOW *screen, struct flow_list *fl,
+                      int skip_lines)
+{
+       struct proc_entry *proc, *tmp;
+       unsigned int line = 4;
+       int skip = skip_lines;
+       int procs = 0;
+
+       rcu_read_lock();
+
+       ui_table_clear(&procs_tbl);
+       ui_table_header_print(&procs_tbl);
+
+       cds_list_for_each_entry_safe(proc, tmp, &proc_list.head, entry) {
+               if (line + 1 >= rows)
+                       continue;
+               if (--skip >= 0)
+                       continue;
+
+               draw_proc_entry(proc);
+               procs++;
+               line++;
+       }
+
+       rcu_read_unlock();
+
+       draw_filter_status("Processes", procs, skip_lines);
 }
 
 static void draw_help(void)
@@ -1104,26 +1282,27 @@ static void draw_help(void)
        mvaddnstr(row + 2, col + 2, "Navigation", -1);
        attroff(A_BOLD | A_UNDERLINE);
 
-       mvaddnstr(row + 4, col + 3, "Up, u, k      Move up", -1);
-       mvaddnstr(row + 5, col + 3, "Down, d, j    Move down", -1);
-       mvaddnstr(row + 6, col + 3, "Left,l        Scroll left", -1);
-       mvaddnstr(row + 7, col + 3, "Right,h       Scroll right", -1);
-       mvaddnstr(row + 8, col + 3, "?             Toggle help window", -1);
-       mvaddnstr(row + 9, col + 3, "q, Ctrl+C     Quit", -1);
+       mvaddnstr(row + 4, col + 3, "TAB           Go to next tab panel", -1);
+       mvaddnstr(row + 5, col + 3, "Up, u, k      Move up", -1);
+       mvaddnstr(row + 6, col + 3, "Down, d, j    Move down", -1);
+       mvaddnstr(row + 7, col + 3, "Left,l        Scroll left", -1);
+       mvaddnstr(row + 8, col + 3, "Right,h       Scroll right", -1);
+       mvaddnstr(row + 9, col + 3, "?             Toggle help window", -1);
+       mvaddnstr(row + 10, col + 3, "q, Ctrl+C     Quit", -1);
 
        attron(A_BOLD | A_UNDERLINE);
-       mvaddnstr(row + 11, col + 2, "Display Settings", -1);
+       mvaddnstr(row + 12, col + 2, "Display Settings", -1);
        attroff(A_BOLD | A_UNDERLINE);
 
-       mvaddnstr(row + 13, col + 3, "b     Toggle rate units (bits/bytes)", 
-1);
-       mvaddnstr(row + 14, col + 3, "a     Toggle display of active flows 
(rate > 0) only", -1);
-       mvaddnstr(row + 15, col + 3, "s     Toggle show source peer info", -1);
+       mvaddnstr(row + 14, col + 3, "b     Toggle rate units (bits/bytes)", 
-1);
+       mvaddnstr(row + 15, col + 3, "a     Toggle display of active flows 
(rate > 0) only", -1);
+       mvaddnstr(row + 16, col + 3, "s     Toggle show source peer info", -1);
 
-       mvaddnstr(row + 17, col + 3, "T     Toggle display TCP flows", -1);
-       mvaddnstr(row + 18, col + 3, "U     Toggle display UDP flows", -1);
-       mvaddnstr(row + 19, col + 3, "D     Toggle display DCCP flows", -1);
-       mvaddnstr(row + 20, col + 3, "I     Toggle display ICMP flows", -1);
-       mvaddnstr(row + 21, col + 3, "S     Toggle display SCTP flows", -1);
+       mvaddnstr(row + 18, col + 3, "T     Toggle display TCP flows", -1);
+       mvaddnstr(row + 19, col + 3, "U     Toggle display UDP flows", -1);
+       mvaddnstr(row + 20, col + 3, "D     Toggle display DCCP flows", -1);
+       mvaddnstr(row + 21, col + 3, "I     Toggle display ICMP flows", -1);
+       mvaddnstr(row + 22, col + 3, "S     Toggle display SCTP flows", -1);
 }
 
 static void draw_header(WINDOW *screen)
@@ -1203,9 +1382,46 @@ static void flows_table_init(struct ui_table *tbl)
        ui_table_header_color_set(&flows_tbl, COLOR(BLACK, GREEN));
 }
 
+static void procs_table_init(struct ui_table *tbl)
+{
+       ui_table_init(tbl);
+
+       ui_table_pos_set(tbl, 3, 0);
+       ui_table_height_set(tbl, LINES - 3);
+
+       ui_table_col_add(tbl, TBL_PROC_NAME, "NAME", 13);
+       ui_table_col_add(tbl, TBL_PROC_PID, "PID", 7);
+       ui_table_col_add(tbl, TBL_PROC_FLOWS, "FLOWS", 7);
+       ui_table_col_add(tbl, TBL_PROC_BYTES_SRC, "BYTES_SRC", 10);
+       ui_table_col_add(tbl, TBL_PROC_BYTES_DST, "BYTES_DST", 10);
+       ui_table_col_add(tbl, TBL_PROC_RATE_SRC, "RATE_SRC", 14);
+       ui_table_col_add(tbl, TBL_PROC_RATE_DST, "RATE_DST", 14);
+
+       ui_table_col_align_set(tbl, TBL_PROC_BYTES_SRC, UI_ALIGN_RIGHT);
+       ui_table_col_align_set(tbl, TBL_PROC_RATE_SRC, UI_ALIGN_RIGHT);
+       ui_table_col_align_set(tbl, TBL_PROC_BYTES_DST, UI_ALIGN_RIGHT);
+       ui_table_col_align_set(tbl, TBL_PROC_RATE_DST, UI_ALIGN_RIGHT);
+
+       ui_table_col_color_set(tbl, TBL_PROC_NAME, COLOR(YELLOW, BLACK));
+       ui_table_col_color_set(tbl, TBL_PROC_PID, A_BOLD);
+       ui_table_col_color_set(tbl, TBL_PROC_FLOWS, COLOR(YELLOW, BLACK));
+       ui_table_col_color_set(tbl, TBL_PROC_BYTES_SRC, COLOR(RED, BLACK));
+       ui_table_col_color_set(tbl, TBL_PROC_RATE_SRC, COLOR(RED, BLACK));
+       ui_table_col_color_set(tbl, TBL_PROC_BYTES_DST, COLOR(BLUE, BLACK));
+       ui_table_col_color_set(tbl, TBL_PROC_RATE_DST, COLOR(BLUE, BLACK));
+
+       ui_table_header_color_set(tbl, COLOR(BLACK, GREEN));
+}
+
 static void tab_main_on_open(struct ui_tab *tab, enum ui_tab_event_t evt, 
uint32_t id)
 {
-       draw_flows(screen, &flow_list, skip_lines);
+       if (evt != UI_TAB_EVT_OPEN)
+               return;
+
+       if (id == TAB_FLOWS)
+               draw_flows(screen, &flow_list, skip_lines);
+       else if (id == TAB_PROCS)
+               draw_procs(screen, &flow_list, skip_lines);
 }
 
 static void presenter(void)
@@ -1228,12 +1444,14 @@ static void presenter(void)
        INIT_COLOR(BLACK, GREEN);
 
         flows_table_init(&flows_tbl);
+        procs_table_init(&procs_tbl);
 
        tab_main = ui_tab_create();
        ui_tab_event_cb_set(tab_main, tab_main_on_open);
        ui_tab_pos_set(tab_main, 2, 0);
        ui_tab_active_color_set(tab_main, COLOR(BLACK, GREEN));
        ui_tab_entry_add(tab_main, TAB_FLOWS, "Flows");
+       ui_tab_entry_add(tab_main, TAB_PROCS, "Process");
 
        rcu_register_thread();
        while (!sigint) {
@@ -1314,6 +1532,7 @@ static void presenter(void)
        rcu_unregister_thread();
 
        ui_table_uninit(&flows_tbl);
+       ui_table_uninit(&procs_tbl);
        ui_tab_destroy(tab_main);
 
        screen_end();
@@ -1417,6 +1636,39 @@ static int flow_event_cb(enum nf_conntrack_msg_type type,
        }
 }
 
+static void collector_refresh_procs(void)
+{
+       struct proc_entry *p, *tmp;
+
+       cds_list_for_each_entry_safe(p, tmp, &proc_list.head, entry) {
+               double sec = (double)time_after_us(&p->last_update) / 
USEC_PER_SEC;
+               struct flow_entry *n;
+
+               if (sec < 1)
+                       continue;
+
+               bug_on(gettimeofday(&p->last_update, NULL));
+
+               if (!p->flows_count && !proc_exists(p->pid)) {
+                       cds_list_del_rcu(&p->entry);
+                       call_rcu(&p->rcu, proc_entry_xfree_rcu);
+                       continue;
+               }
+
+               p->rate_bytes_src = 0;
+               p->rate_bytes_dst = 0;
+               p->rate_pkts_src = 0;
+               p->rate_pkts_dst = 0;
+
+               cds_list_for_each_entry_rcu(n, &p->flows, proc_head) {
+                       p->rate_bytes_src += n->rate_bytes_src;
+                       p->rate_bytes_dst += n->rate_bytes_dst;
+                       p->rate_pkts_src += n->rate_pkts_src;
+                       p->rate_pkts_dst += n->rate_pkts_dst;
+               }
+       }
+}
+
 static void collector_refresh_flows(struct nfct_handle *handle)
 {
        struct flow_entry *n;
@@ -1563,6 +1815,7 @@ static void *collector(void *null __maybe_unused)
        struct nfct_handle *ct_event;
        struct pollfd poll_fd[1];
 
+       proc_list_init(&proc_list);
        flow_list_init(&flow_list);
 
        ct_event = nfct_open(CONNTRACK, NF_NETLINK_CONNTRACK_NEW |
@@ -1600,6 +1853,7 @@ static void *collector(void *null __maybe_unused)
                        collector_dump_flows();
                }
 
+               collector_refresh_procs();
                collector_refresh_flows(ct_event);
 
                status = poll(poll_fd, 1, 0);
@@ -1615,6 +1869,7 @@ static void *collector(void *null __maybe_unused)
        }
 
        flow_list_destroy(&flow_list);
+       proc_list_destroy(&proc_list);
 
        rcu_unregister_thread();
 
diff --git a/proc.c b/proc.c
index c25396a..a0e6499 100644
--- a/proc.c
+++ b/proc.c
@@ -183,3 +183,14 @@ int proc_exec(const char *proc, char *const argv[])
 
        return 0;
 }
+
+bool proc_exists(pid_t pid)
+{
+       struct stat statbuf;
+       char path[1024];
+
+       if (snprintf(path, sizeof(path), "/proc/%u", pid) < 0)
+               return false;
+
+       return stat(path, &statbuf) == 0;
+}
diff --git a/proc.h b/proc.h
index 6e5f3ac..4c34a64 100644
--- a/proc.h
+++ b/proc.h
@@ -1,7 +1,7 @@
 #ifndef PROC_H
 #define PROC_H
 
-#include <stdlib.h>
+#include <stdbool.h>
 
 extern void cpu_affinity(int cpu);
 extern int set_proc_prio(int prio);
@@ -9,5 +9,6 @@ extern int set_sched_status(int policy, int priority);
 extern ssize_t proc_get_cmdline(unsigned int pid, char *cmdline, size_t len);
 extern int proc_exec(const char *proc, char *const argv[]);
 extern int proc_find_by_inode(ino_t ino, char *cmdline, size_t len, pid_t 
*pid);
+extern bool proc_exists(pid_t pid);
 
 #endif /* PROC_H */
-- 
2.11.0

-- 
You received this message because you are subscribed to the Google Groups 
"netsniff-ng" group.
To unsubscribe from this group and stop receiving emails from it, send an email 
to netsniff-ng+unsubscr...@googlegroups.com.
For more options, visit https://groups.google.com/d/optout.

Reply via email to