On Mon, Nov 3, 2025 at 3:09 PM Jakub Wartak <[email protected]> wrote: > > Attached is pg_stat_tcpinfo, an heavy work in progress, Linux-only > netstat/ss-like extension for showing detailed information about TCP > connections based on information from the kernel itself. [..]
> Some early feedback about direction in order to bring this into core > would be appreciated. State of stuff: > > 1. Andres is pushing for supporting UNIX domain sockets here, but I'm > not sure if it is really worth the effort (and it would trigger new > naming problem;)) and primarily making the code even more complex. > IMHO the netlinksock_diag API is already convoluted and adding AF_UNIX > would make it even less readable. > 2. IPv6 works, but wasn't tested much. > 3. Biggest TODO left is probably properly formatting the information > based on struct tcpinfo variables (just like ss(1) does, so keeping > the same unit/formatting) > 4. Patch/tests are missing intentionally as I would like first to > stabilize the outputs/naming/code first. > 5. [security] Should this be available to pg_monitor/pg_read_all_stats > or just to superuser? > 6. [security] Should this return info about all TCP connections or > just the UID of the postmaster? v2 attached with tiny fixes and little more code readability (for dumping struct tcpinfo *) -J.
From 9994e60137cec47cfca5f97e888f7f1249caf7e0 Mon Sep 17 00:00:00 2001 From: Jakub Wartak <[email protected]> Date: Mon, 3 Nov 2025 14:57:52 +0100 Subject: [PATCH v2] pg_stat_tcpinfo --- contrib/Makefile | 1 + contrib/meson.build | 1 + contrib/pg_stat_tcpinfo/Makefile | 21 + contrib/pg_stat_tcpinfo/meson.build | 25 + .../pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql | 29 + contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c | 1076 +++++++++++++++++ .../pg_stat_tcpinfo/pg_stat_tcpinfo.control | 5 + 7 files changed, 1158 insertions(+) create mode 100644 contrib/pg_stat_tcpinfo/Makefile create mode 100644 contrib/pg_stat_tcpinfo/meson.build create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c create mode 100644 contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control diff --git a/contrib/Makefile b/contrib/Makefile index 2f0a88d3f77..8060518f118 100644 --- a/contrib/Makefile +++ b/contrib/Makefile @@ -36,6 +36,7 @@ SUBDIRS = \ pg_overexplain \ pg_prewarm \ pg_stat_statements \ + pg_stat_tcpinfo \ pg_surgery \ pg_trgm \ pgrowlocks \ diff --git a/contrib/meson.build b/contrib/meson.build index ed30ee7d639..f37d26ccf40 100644 --- a/contrib/meson.build +++ b/contrib/meson.build @@ -51,6 +51,7 @@ subdir('pg_overexplain') subdir('pg_prewarm') subdir('pgrowlocks') subdir('pg_stat_statements') +subdir('pg_stat_tcpinfo') subdir('pgstattuple') subdir('pg_surgery') subdir('pg_trgm') diff --git a/contrib/pg_stat_tcpinfo/Makefile b/contrib/pg_stat_tcpinfo/Makefile new file mode 100644 index 00000000000..aec1111ce54 --- /dev/null +++ b/contrib/pg_stat_tcpinfo/Makefile @@ -0,0 +1,21 @@ +# contrib/pg_stat_tcpinfo/Makefile + +MODULE_big = pg_stat_tcpinfo +OBJS = pg_stat_tcpinfo.o + +EXTENSION = pg_stat_tcpinfo +DATA = pg_stat_tcpinfo--1.0.sql +PGFILEDESC = "pg_stat_tcpinfo - show detailed TCP connection info on Linux.' + +TAP_TESTS = 1 + +ifdef USE_PGXS +PG_CONFIG = pg_config +PGXS := $(shell $(PG_CONFIG) --pgxs) +include $(PGXS) +else +subdir = contrib/pg_stat_tcpinfo +top_builddir = ../.. +include $(top_builddir)/src/Makefile.global +include $(top_srcdir)/contrib/contrib-global.mk +endif diff --git a/contrib/pg_stat_tcpinfo/meson.build b/contrib/pg_stat_tcpinfo/meson.build new file mode 100644 index 00000000000..b78a5e25ec4 --- /dev/null +++ b/contrib/pg_stat_tcpinfo/meson.build @@ -0,0 +1,25 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group + +pg_stat_tcpinfo_sources = files( + 'pg_stat_tcpinfo.c', +) + +pg_stat_tcpinfo = shared_module('pg_stat_tcpinfo', + pg_stat_tcpinfo_sources, + kwargs: contrib_mod_args + { + 'dependencies': contrib_mod_args['dependencies'], + }, +) +contrib_targets += pg_stat_tcpinfo + +install_data( + 'pg_stat_tcpinfo.control', + 'pg_stat_tcpinfo--1.0.sql', + kwargs: contrib_data_args, +) + +tests += { + 'name': 'pg_stat_tcpinfo', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), +} diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql new file mode 100644 index 00000000000..83b18225a0d --- /dev/null +++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo--1.0.sql @@ -0,0 +1,29 @@ +-- tcpinfo--1.0.sql +-- complain if script is sourced in psql, rather than via CREATE EXTENSION +\echo Use "CREATE EXTENSION pg_stat_tcpinfo" to load this file. \quit + +CREATE FUNCTION pg_stat_get_tcpinfo( + OUT pid integer, + OUT uid integer, + OUT src_addr inet, + OUT src_port integer, + OUT dst_addr inet, + OUT dst_port integer, + OUT state text, + OUT recvq integer, + OUT sendq integer, + OUT tcpinfo jsonb +) +RETURNS SETOF record +AS '$libdir/pg_stat_tcpinfo', 'pg_stat_get_tcpinfo' +LANGUAGE C STRICT VOLATILE; + +COMMENT ON FUNCTION pg_stat_get_tcpinfo() +IS 'Shows detailed TCP connection information on Linux.'; + +CREATE VIEW pg_stat_tcpinfo AS SELECT * FROM pg_stat_get_tcpinfo(); + +GRANT EXECUTE ON FUNCTION pg_stat_get_tcpinfo() TO pg_monitor; +GRANT EXECUTE ON FUNCTION pg_stat_get_tcpinfo() TO pg_read_all_stats; +GRANT SELECT ON pg_stat_tcpinfo TO pg_monitor; +GRANT SELECT ON pg_stat_tcpinfo TO pg_read_all_stats; diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c new file mode 100644 index 00000000000..2ec04d77e32 --- /dev/null +++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c @@ -0,0 +1,1076 @@ +/*------------------------------------------------------------------------- + * + * pg_stat_tcpinfo.c + * A netstat/ss-like Linux-only function and view for PostgreSQL. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + * + * IDENTIFICATION + * contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.c + * + *------------------------------------------------------------------------- + */ + +/* + * It works in three main parts: + * 1. Scans /proc/net/tcp* to get a list of all active TCP sockets and their + * 'inode' numbers. + * 2. Scans the /proc filesystem (all /proc/[PID]/fd/ directories) + * to build a map of which PID owns which socket inode (by reading symlink). + * 3. Queries the netlink INET_DIAG interface to get detailed + * TCP info (like RTT, skmem, timers, congestion algorithm used) for all + * connections. + * 4. Joins these three pieces of information and returns them as a set of rows. + * + * This function must be run by a user with sufficient permissions + * (e.g., as part of the 'postgres' superuser) and the PostgreSQL + * server process itself must have permissions to read the + * /proc/[PID]/fd directories of processes owned by other users. + * Without such permissions, the 'pid' column will be NULL for most connections. + */ + +#include "c.h" +#include "postgres.h" + +#include "fmgr.h" +#include "funcapi.h" +#include "utils/builtins.h" +#include "utils/hsearch.h" +#include "lib/stringinfo.h" +#include "miscadmin.h" + +#include <arpa/inet.h> +#include <asm/types.h> +#include <ctype.h> +#include <dirent.h> +#include <linux/inet_diag.h> +#include <linux/netlink.h> +#include <linux/rtnetlink.h> +#include <linux/sock_diag.h> +#include <linux/tcp.h> +#include <netinet/in.h> +#include <stdint.h> +#include <stdlib.h> +#include <string.h> +#include <sys/socket.h> +#include <sys/stat.h> +#include <sys/types.h> +#include <unistd.h> +#include <stddef.h> + +PG_MODULE_MAGIC_EXT( + .name = "pg_stat_tcpinfo", + .version = PG_VERSION +); + +#ifdef __linux__ + +/* Linux kernel TCP states, see linux/include/net/tcp_states.h */ +enum +{ + TCP_ESTABLISHED = 1, + TCP_SYN_SENT, + TCP_SYN_RECV, + TCP_FIN_WAIT1, + TCP_FIN_WAIT2, + TCP_TIME_WAIT, + TCP_CLOSE, + TCP_CLOSE_WAIT, + TCP_LAST_ACK, + TCP_LISTEN, + TCP_CLOSING +}; + +/* Map of TCP states to strings */ +static const char *tcp_states_map[] = { + [0] = "UNKNOWN", + [TCP_ESTABLISHED] = "ESTABLISHED", + [TCP_SYN_SENT] = "SYN-SENT", + [TCP_SYN_RECV] = "SYN-RECV", + [TCP_FIN_WAIT1] = "FIN-WAIT-1", + [TCP_FIN_WAIT2] = "FIN-WAIT-2", + [TCP_TIME_WAIT] = "TIME-WAIT", + [TCP_CLOSE] = "CLOSE", + [TCP_CLOSE_WAIT] = "CLOSE-WAIT", + [TCP_LAST_ACK] = "LAST-ACK", + [TCP_LISTEN] = "LISTEN", + [TCP_CLOSING] = "CLOSING" +}; + +/* See enum in netinet/tcp.h, TCP_CLOSING seems to be the last one */ +#define TCP_MAX_STATE TCP_CLOSING + 1 + +/* see sock_diag(7) nearby idiag_timer */ +static const char *tcptimer_names_map[] = { + "off", + "on", + "keepalive", + "timewait", + "persist", /* zero probe window */ + "unknown" +}; + +/* + * Used by struct inet_diag_req_v2 -> idiag_states (there are 11 states + * so we need 12 bitmask). It could also be named as TCP_ALL_FLAGS + */ +#define TCPF_ALL 0xFFF + +/* Netlink socket recieve buffer size */ +#define NL_SOCKET_BUFFER_SIZE 8192 + +/* Stores stuff from /proc/net/tcp */ +typedef struct TcpConnection +{ + char local_addr_str[64]; /* IP:port */ + char remote_addr_str[64]; /* IP:port */ + char local_ip_str[INET6_ADDRSTRLEN]; /* IP */ + char remote_ip_str[INET6_ADDRSTRLEN]; /* IP */ + int local_port; + int remote_port; + int state; + int uid; + __u8 family; + unsigned long long inode; + struct TcpConnection *next; +} TcpConnection; + +typedef struct InodePid +{ + unsigned long long inode; + int pid; +} InodePid; + +typedef struct NlDiagInfo +{ + char key[512]; /* Key:"L_IP:L_PORT-R_IP:R_PORT" */ + struct tcp_info tcpi; + __u32 skmem[SK_MEMINFO_VARS]; + + /* + * 12 below because they are not long names, see available + * /lib/modules/$(uname -r)/kernel/net/ipv4/tcp_<cong>.ko kernel modules + */ + char cong[12]; + int has_tcpi; + int has_skmem; + int has_cong; + char tcp_timer_str[64]; +} NlDiagInfo; + + +/* Inserts an inode+PID pair into the hash map. */ +static void +insert_pid(HTAB *pid_map, unsigned long long inode, int pid) +{ + InodePid *entry; + bool found; + + entry = (InodePid *) hash_search(pid_map, &inode, HASH_ENTER, &found); + entry->pid = pid; +} + +/* Locate a PID in the hash map given an inode. Returns PID or -1 */ +static int +find_pid(HTAB *pid_map, unsigned long long inode) +{ + InodePid *entry; + + entry = (InodePid *) hash_search(pid_map, &inode, HASH_FIND, NULL); + if (entry) + return entry->pid; + + return -1; +} + +/* Formats a unique key string for a connection. */ +static void +format_connection_key(char *hash_key, size_t hash_key_sz, + const char *local_addr, int local_port, + const char *remote_addr, int remote_port) +{ + snprintf(hash_key, hash_key_sz, "%s:%d-%s:%d", + local_addr, local_port, remote_addr, remote_port); +} + +/* + * Finds and creates/updates a node in the nldiag_map and stores data in it. + */ +static void +store_netlink_info(HTAB *nldiag_map, + const char *local_addr, const int local_port, + const char *remote_addr, const int remote_port, + const int type, void *data, const char *tcp_timer_str) +{ + char hash_key[512]; + NlDiagInfo *entry; + bool found; + + format_connection_key(hash_key, sizeof(hash_key), local_addr, local_port, remote_addr, remote_port); + entry = (NlDiagInfo *) hash_search(nldiag_map, hash_key, HASH_ENTER, &found); + elog(DEBUG5, "saving some netlink chatter about %s into %p", hash_key, entry); + + if (!found) + { + /* + * New entry. hash_search() copied the key, but the rest of the struct + * is uninitialized. Zero the payload just in case. + */ + memset((char *) entry + offsetof(NlDiagInfo, tcpi), 0, + sizeof(NlDiagInfo) - offsetof(NlDiagInfo, tcpi)); + } + + /* Update the entry (whether new or old). */ + memcpy(entry->tcp_timer_str, tcp_timer_str, sizeof(entry->tcp_timer_str)); + + switch (type) + { + case INET_DIAG_INFO: + memcpy(&entry->tcpi, data, sizeof(struct tcp_info)); + entry->has_tcpi = 1; + break; + case INET_DIAG_SKMEMINFO: + memcpy(entry->skmem, data, sizeof(__u32) * SK_MEMINFO_VARS); + entry->has_skmem = 1; + break; + case INET_DIAG_CONG: + memcpy(entry->cong, data, sizeof(entry->cong)); + entry->has_cong = 1; + break; + default: + elog(WARNING, "unsupported inet diag type reply"); + } +} + +/* + * Finds netlink info in the hash map. + * Returns pointer to NlDiagInfo, or NULL if not found. + */ +static NlDiagInfo * +find_netlink_info(HTAB *nldiag_map, + const char *local_addr, const int local_port, + const char *remote_addr, const int remote_port) +{ + char hash_key[512]; + NlDiagInfo *entry; + + format_connection_key(hash_key, sizeof(hash_key), local_addr, local_port, remote_addr, remote_port); + entry = (NlDiagInfo *) hash_search(nldiag_map, hash_key, HASH_FIND, NULL); + elog(DEBUG5, "nldiag_map returning about %s --> %p", hash_key, entry); + return entry; +} + +/* + * Please see man sock_diag(7) on Linux for details about this API. + */ +static int +send_diag_msg(int sockfd, __u8 family) +{ + struct msghdr msg; + struct nlmsghdr nlh; + struct inet_diag_req_v2 conn_req; + struct sockaddr_nl sa; + struct iovec iov[4]; + int retval = 0; + + elog(DEBUG1, "quering netlink socket for TCP low-level stats"); + + memset(&msg, 0, sizeof(msg)); + memset(&sa, 0, sizeof(sa)); + memset(&nlh, 0, sizeof(nlh)); + memset(&conn_req, 0, sizeof(conn_req)); + + sa.nl_family = AF_NETLINK; + conn_req.sdiag_family = family; + conn_req.sdiag_protocol = IPPROTO_TCP; + + /* + * Do not filter out any TCP states (include all). + * + * Maybe we should filter-out everything else here than TCP_ESTABLISHED? + * But somehow stuck connections (e.g. in TCP_SYN_SENT) seems to be useful + * info on it's own. Anyway, filtering out could work that way: + * + * conn_req.idiag_states = TCPF_ALL & ~((1 << TCP_SYN_RECV) | (1 << + * TCP_TIME_WAIT) | (1 << TCP_CLOSE)); + */ + conn_req.idiag_states = TCPF_ALL; + + /* Request extended TCP information: see linux/inet_diag include */ + conn_req.idiag_ext |= (1 << (INET_DIAG_INFO - 1)); + conn_req.idiag_ext |= (1 << (INET_DIAG_SKMEMINFO - 1)); + conn_req.idiag_ext |= (1 << (INET_DIAG_CONG - 1)); + /* XXX: we could query for INET_DIAG_VEGASINFO too */ +#ifdef INET_DIAG_BBRINFO + conn_req.idiag_ext |= (1 << (INET_DIAG_BBRINFO - 1)); +#endif + + nlh.nlmsg_len = NLMSG_LENGTH(sizeof(conn_req)); + nlh.nlmsg_flags = NLM_F_DUMP | NLM_F_REQUEST; + nlh.nlmsg_type = SOCK_DIAG_BY_FAMILY; + iov[0].iov_base = (void *) &nlh; + iov[0].iov_len = sizeof(nlh); + iov[1].iov_base = (void *) &conn_req; + iov[1].iov_len = sizeof(conn_req); + + msg.msg_name = (void *) &sa; + msg.msg_namelen = sizeof(sa); + msg.msg_iov = iov; + msg.msg_iovlen = 2; + + retval = sendmsg(sockfd, &msg, 0); + + return retval; +} + + +static char * +ms_to_min_sec(unsigned long ms) +{ + static char buffer[64]; + long total_seconds = ms / 1000; + long minutes = total_seconds / 60; + long seconds = total_seconds % 60; + + memset(buffer, 0, sizeof(buffer)); + snprintf(buffer, sizeof(buffer), "%ldmin%ldsec", minutes, seconds); + return buffer; +} + +static void +parse_diag_msg(HTAB *nldiag_map, struct inet_diag_msg *diag_msg, int rtalen) +{ + struct rtattr *attr; + char local_addr_buf[INET6_ADDRSTRLEN]; + char remote_addr_buf[INET6_ADDRSTRLEN]; + char tcp_timer_str[64]; + int local_port, + remote_port; + + memset(local_addr_buf, 0, sizeof(local_addr_buf)); + memset(remote_addr_buf, 0, sizeof(remote_addr_buf)); + + local_port = ntohs(diag_msg->id.idiag_sport); + remote_port = ntohs(diag_msg->id.idiag_dport); + + if (diag_msg->idiag_family == AF_INET) + { + inet_ntop(AF_INET, (struct in_addr *) &(diag_msg->id.idiag_src), + local_addr_buf, INET_ADDRSTRLEN); + inet_ntop(AF_INET, (struct in_addr *) &(diag_msg->id.idiag_dst), + remote_addr_buf, INET_ADDRSTRLEN); + } + else if (diag_msg->idiag_family == AF_INET6) + { + inet_ntop(AF_INET6, (struct in_addr6 *) &(diag_msg->id.idiag_src), + local_addr_buf, INET6_ADDRSTRLEN); + inet_ntop(AF_INET6, (struct in_addr6 *) &(diag_msg->id.idiag_dst), + remote_addr_buf, INET6_ADDRSTRLEN); + } + else + { + /* Unknown family, just log it */ + ereport(WARNING, (errmsg("unknown address family in netlink response: %d", diag_msg->idiag_family))); + return; + } + + if (local_addr_buf[0] == 0 || remote_addr_buf[0] == 0) + { + ereport(WARNING, (errmsg("could not get connection information from netlink message"))); + return; + } + + /* Format the TCP timer information that is going to be saved later on. */ + snprintf(tcp_timer_str, sizeof(tcp_timer_str), "%s,%s,%d", + tcptimer_names_map[diag_msg->idiag_timer], + ms_to_min_sec(diag_msg->idiag_expires), + diag_msg->idiag_retrans); + + /* + * XXX: perhaps also save diag_msg->idiag_[rw]queue, but we already have + * it from skmem + */ + + /* Parse the attributes, loop as we'll have multiple of them */ + if (rtalen > 0) + { + attr = (struct rtattr *) (diag_msg + 1); + + while (RTA_OK(attr, rtalen)) + { + int type = attr->rta_type; + + switch (type) + { + case INET_DIAG_INFO: + { + struct tcp_info *tcpi = (struct tcp_info *) RTA_DATA(attr); + + store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port, + type, tcpi, tcp_timer_str); + break; + } + case INET_DIAG_SKMEMINFO: + { + __u32 *skmem = RTA_DATA(attr); + + store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port, + type, skmem, tcp_timer_str); + break; + } + case INET_DIAG_CONG: + { + char *cong = (char *) RTA_DATA(attr); + + store_netlink_info(nldiag_map, local_addr_buf, local_port, remote_addr_buf, remote_port, + type, cong, tcp_timer_str); + break; + } + + } + + attr = RTA_NEXT(attr, rtalen); + } + } +} + + +/* + * Scans all /proc/[PID]/fd/ entries to map socket inodes to PIDs. + * Fills the provided pid_map hash table. + */ +static void +scan_proc_fds(HTAB *pid_map) +{ + DIR *proc_dir, + *fd_dir; + struct dirent *pid_entry, + *fd_entry; + char fd_path[MAXPGPATH]; + char link_path[MAXPGPATH]; + char link_target[MAXPGPATH]; + ssize_t link_len; + unsigned long long inode; + + elog(DEBUG1, "scanning /proc for PIDs"); + + proc_dir = opendir("/proc"); + if (!proc_dir) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open /proc: %m"))); + return; + } + + /* Iterate over each entry in /proc */ + while ((pid_entry = readdir(proc_dir)) != NULL) + { + /* Check if the directory name is a number (a PID) */ + if (pid_entry->d_type == DT_DIR && isdigit(pid_entry->d_name[0])) + { + int pid = atoi(pid_entry->d_name); + + snprintf(fd_path, sizeof(fd_path), "/proc/%d/fd", pid); + + /* Open the /proc/[PID]/fd directory */ + fd_dir = opendir(fd_path); + if (!fd_dir) + { + ereport(DEBUG4, + (errcode_for_file_access(), + errmsg("could not open directory \"%s\": %m", fd_path))); + continue; + } + + /* Iterate over each file descriptor in /proc/[PID]/fd */ + while ((fd_entry = readdir(fd_dir)) != NULL) + { + snprintf(link_path, sizeof(link_path), "%s/%s", fd_path, fd_entry->d_name); + + /* Read the symbolic link target */ + link_len = readlink(link_path, link_target, sizeof(link_target) - 1); + if (link_len == -1) + { + /* Failed to read link */ + ereport(DEBUG4, + (errcode_for_file_access(), + errmsg("could not read link \"%s\": %m", link_path))); + continue; + } + link_target[link_len] = '\0'; + + /* Check if it's a socket */ + if (strncmp(link_target, "socket:[", strlen("socket:[")) == 0) + { + if (sscanf(link_target, "socket:[%llu]", &inode) == 1) + { + /* Add this inode->PID mapping to our hash table */ + insert_pid(pid_map, inode, pid); + } + } + } + closedir(fd_dir); + } + } + closedir(proc_dir); +} + + +/* + * Reads /proc/net/tcp* and builds a linked list of connections. + * Returns pointer to the head of the TcpConnection linked list. + */ +static TcpConnection * read_tcp_connections(__u8 family) +{ + FILE *fp; + char line[1024], + *tcp_file_name; + TcpConnection *head = NULL; + int local_port, + remote_port, + state, + uid, + slot; + unsigned long local_ip_hex, + remote_ip_hex; + struct in6_addr local_ip6_hex, + remote_ip6_hex; + unsigned long long inode; + TcpConnection *conn; + + tcp_file_name = family == AF_INET ? "/proc/net/tcp" : "/proc/net/tcp6"; + elog(DEBUG1, "scanning %s for TCP connections and inodes", tcp_file_name); + + fp = fopen(tcp_file_name, "r"); + if (fp == NULL) + { + ereport(ERROR, + (errcode_for_file_access(), + errmsg("could not open %s: %m", tcp_file_name))); + return NULL; + } + + /* Skip the header line */ + if (fgets(line, sizeof(line), fp) == NULL) + { + fclose(fp); + ereport(ERROR, + (errcode(ERRCODE_IO_ERROR), + errmsg("could not read header from %s", tcp_file_name))); + return NULL; + } + + /* Read each line */ + while (fgets(line, sizeof(line), fp) != NULL) + { + int num_matched, + proper_matches; + + if (family == AF_INET) + { + num_matched = sscanf(line, "%4d: %08lX:%04X %08lX:%04X %02X %*s %*s %*s %d %*s %llu", + &slot, + &local_ip_hex, + &local_port, + &remote_ip_hex, + &remote_port, + &state, + &uid, + &inode); + proper_matches = 8; + } + else + { + /* + * Madness? This ... is ... sparta! + * + * /proc files for IPv6 tend to use 32-char hex representation of + * IPv6 address + */ + num_matched = sscanf(line, "%4d: %08X%08X%08X%08X:%04X %08X%08X%08X%08X:%04X %02X %*s %*s %*s %d %*s %llu", + &slot, + &local_ip6_hex.s6_addr32[0], + &local_ip6_hex.s6_addr32[1], + &local_ip6_hex.s6_addr32[2], + &local_ip6_hex.s6_addr32[3], + &local_port, + &remote_ip6_hex.s6_addr32[0], + &remote_ip6_hex.s6_addr32[1], + &remote_ip6_hex.s6_addr32[2], + &remote_ip6_hex.s6_addr32[3], + &remote_port, + &state, + &uid, + &inode); + proper_matches = 14; + } + + if (num_matched < proper_matches) + { + /* Failed to parse, so chomp last new line character and show it */ + line[strlen(line) - 1] = 0; + ereport(WARNING, (errmsg("failed to parse line from %s (got just %d matches): %s", tcp_file_name, num_matched, line))); + continue; + } + + /* Create a new connection node */ + conn = palloc(sizeof(TcpConnection)); + + conn->local_port = local_port; + conn->remote_port = remote_port; + + /* It's already in network byte order (big-endian) */ + if (family == AF_INET) + { + if (inet_ntop(AF_INET, &local_ip_hex, conn->local_ip_str, INET_ADDRSTRLEN) == NULL) + { + ereport(WARNING, (errmsg("inet_ntop() failed for local IP: %m"))); + strncpy(conn->local_ip_str, "INVALID_IP", INET_ADDRSTRLEN); + } + + /* Format remote IP */ + if (inet_ntop(AF_INET, &remote_ip_hex, conn->remote_ip_str, INET_ADDRSTRLEN) == NULL) + { + ereport(WARNING, (errmsg("inet_ntop() failed for remote IP: %m"))); + strncpy(conn->remote_ip_str, "INVALID_IP", INET_ADDRSTRLEN); + } + } + else + { + /* AF_INET6 */ + if (inet_ntop(AF_INET6, &local_ip6_hex, conn->local_ip_str, INET6_ADDRSTRLEN) == NULL) + { + ereport(WARNING, (errmsg("inet_ntop() failed for local IP: %m"))); + strncpy(conn->local_ip_str, "INVALID_IP", INET_ADDRSTRLEN); + } + + /* Format remote IP */ + if (inet_ntop(AF_INET6, &remote_ip6_hex, conn->remote_ip_str, INET6_ADDRSTRLEN) == NULL) + { + ereport(WARNING, (errmsg("inet_ntop() failed for remote IP: %m"))); + strncpy(conn->remote_ip_str, "INVALID_IP", INET_ADDRSTRLEN); + } + + } + + /* Format combined strings */ + snprintf(conn->local_addr_str, sizeof(conn->local_addr_str), "%s:%d", conn->local_ip_str, conn->local_port); + snprintf(conn->remote_addr_str, sizeof(conn->remote_addr_str), "%s:%d", conn->remote_ip_str, conn->remote_port); + + conn->state = state; + conn->uid = uid; + conn->inode = inode; + conn->family = family; + + /* Add to the front of the linked list */ + conn->next = head; + head = conn; + } + + fclose(fp); + return head; +} + + +/* Receive and parse all netlink data, populating nldiag_map */ +static int +recv_diag_msgs(int nl_sock, HTAB *nldiag_map) +{ + uint8_t recv_buf[NL_SOCKET_BUFFER_SIZE]; + int numbytes = 0, + done = 0, + rtalen = 0; + struct inet_diag_msg *diag_msg; + struct nlmsghdr *nlh; + + while (1) + { + numbytes = recv(nl_sock, recv_buf, sizeof(recv_buf), 0); + if (numbytes <= 0) + { + if (numbytes == 0) + ereport(WARNING, (errmsg("netlink socket closed prematurely"))); + else + ereport(WARNING, (errmsg("netlink recv error: %m"))); + break; + /* Exit loop on error or close */ + } + + nlh = (struct nlmsghdr *) recv_buf; + done = 0; + + while (NLMSG_OK(nlh, numbytes)) + { + if (nlh->nlmsg_type == NLMSG_DONE) + { + done = 1; + break; + } + + if (nlh->nlmsg_type == NLMSG_ERROR) + { + struct nlmsgerr *err = (struct nlmsgerr *) NLMSG_DATA(nlh); + + close(nl_sock); + ereport(ERROR, (errmsg("error in netlink message: %s", strerror(-err->error)))); + } + + diag_msg = (struct inet_diag_msg *) NLMSG_DATA(nlh); + rtalen = nlh->nlmsg_len - NLMSG_LENGTH(sizeof(*diag_msg)); + + /* This populates nldiag_map */ + parse_diag_msg(nldiag_map, diag_msg, rtalen); + + nlh = NLMSG_NEXT(nlh, numbytes); + } + + if (done) + break; + } + return 0; +} + + +PG_FUNCTION_INFO_V1(pg_stat_get_tcpinfo); +Datum +pg_stat_get_tcpinfo(PG_FUNCTION_ARGS) +{ + Datum values[10]; + bool nulls[10]; + HTAB *pid_map; + MemoryContext oldcontext; + MemoryContext per_query_ctx; + NlDiagInfo *diag_info; + HTAB *nldiag_map; + ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo; + StringInfoData json_buf; + TcpConnection *tcp_connections; + TcpConnection *current; + TupleDesc tupdesc; + Tuplestorestate *tupstore; + __u32 *skmem, + recvq = 0, + sendq = 0; + bool has_data; + bool has_q_data; + const char *state_str; + int nl_sock = 0, + pid; + HASHCTL ctl; + + /* Check to see if caller supports us returning a tuplestore */ + if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("set-valued function called in context that cannot accept a set"))); + if (!(rsinfo->allowedModes & SFRM_Materialize)) + ereport(ERROR, + (errcode(ERRCODE_FEATURE_NOT_SUPPORTED), + errmsg("materialize mode required, but it is not " \ + "allowed in this context"))); + + /* Switch into long-lived context to construct returned data structures */ + per_query_ctx = rsinfo->econtext->ecxt_per_query_memory; + oldcontext = MemoryContextSwitchTo(per_query_ctx); + + /* Build a tuple descriptor for our result type */ + if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE) + elog(ERROR, "return type must be a row type"); + + tupstore = tuplestore_begin_heap(true, false, work_mem); + rsinfo->returnMode = SFRM_Materialize; + rsinfo->setResult = tupstore; + rsinfo->setDesc = tupdesc; + MemoryContextSwitchTo(oldcontext); + + /* Allocate and initialize the hash maps in the CurrentMemoryContext. */ + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = sizeof(unsigned long long); + ctl.entrysize = sizeof(InodePid); + pid_map = hash_create("Inode to PID Map", 1024, &ctl, HASH_ELEM); + + memset(&ctl, 0, sizeof(ctl)); + ctl.keysize = 512; + ctl.entrysize = sizeof(NlDiagInfo); + nldiag_map = hash_create("Netlink Diag Map", 1024, &ctl, HASH_ELEM | HASH_STRINGS); + + /* Load the inode->PID hash map while scanning /proc/PIDs */ + scan_proc_fds(pid_map); + + /* Read all TCP connections from /proc/net/tcp (IPv4 file) */ + tcp_connections = read_tcp_connections(AF_INET); + if (!tcp_connections) + PG_RETURN_VOID(); + + /* Find the tail of linked list of TCP connections ... */ + current = tcp_connections; + while (current->next != NULL) + { + current = current->next; + } + /* ... and append list of IPv6-based TCP connections */ + current->next = read_tcp_connections(AF_INET6); + + /* Open netlink and query about sockets */ + if ((nl_sock = socket(AF_NETLINK, SOCK_DGRAM, NETLINK_INET_DIAG)) == -1) + { + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("could not create netlink socket: %m"))); + } + + /* IPv4 netlink message */ + if (send_diag_msg(nl_sock, AF_INET) < 0) + { + close(nl_sock); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("could not send AF_INET netlink message: %m"))); + } + if (recv_diag_msgs(nl_sock, nldiag_map)) + { + close(nl_sock); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("could not process netlink message: %m"))); + } + + /* IPv6 netlink message */ + if (send_diag_msg(nl_sock, AF_INET6) < 0) + { + close(nl_sock); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("could not send AF_INET6 netlink message: %m"))); + } + + if (recv_diag_msgs(nl_sock, nldiag_map)) + { + close(nl_sock); + ereport(ERROR, (errcode(ERRCODE_IO_ERROR), + errmsg("could not process netlink message: %m"))); + } + + close(nl_sock); + + /* Start populating the tuplestore */ + initStringInfo(&json_buf); + + /* For each TCP connection from linked list */ + current = tcp_connections; + while (current != NULL) + { + int i = 0; + + memset(values, 0, sizeof(values)); + memset(nulls, 0, sizeof(nulls)); + + /* pid */ + pid = find_pid(pid_map, current->inode); + if (pid == -1) + nulls[i++] = true; + else + values[i++] = Int32GetDatum(pid); + + /* uid */ + values[i++] = Int32GetDatum(current->uid); + + /* local ip */ + if (strcmp(current->local_ip_str, "INVALID_IP") == 0) + nulls[i++] = true; + else + values[i++] = DirectFunctionCall1(inet_in, CStringGetDatum(current->local_ip_str)); + + /* local port */ + values[i++] = Int32GetDatum(current->local_port); + + /* remote ip */ + if (strcmp(current->remote_ip_str, "INVALID_IP") == 0) + nulls[i++] = true; + else + values[i++] = DirectFunctionCall1(inet_in, CStringGetDatum(current->remote_ip_str)); + + /* remote port */ + values[i++] = Int32GetDatum(current->remote_port); + + /* state */ + state_str = "UNKNOWN"; + if (current->state > 0 && current->state < TCP_MAX_STATE) + state_str = tcp_states_map[current->state]; + values[i++] = CStringGetTextDatum(state_str); + + /* build big JSON with detailed information */ + resetStringInfo(&json_buf); + appendStringInfoChar(&json_buf, '{'); + has_data = false; + has_q_data = false; + + diag_info = find_netlink_info(nldiag_map, + current->local_ip_str, current->local_port, + current->remote_ip_str, current->remote_port + ); + + if (diag_info) + { + /* add TCP timer information */ + appendStringInfo(&json_buf, "\"timer\": \"(%s)\", ", diag_info->tcp_timer_str); + + /* add low-level TCP stats for troubleshooting */ + if (diag_info->has_tcpi) + { + struct tcp_info *tcpi = &diag_info->tcpi; + + +#define appendTcpinfoMember(var, type) \ + appendStringInfo(&json_buf, "\"%s\": " type ", ", #var, tcpi->tcpi_##var) + +#define appendCalculation(var, type, calc) \ + appendStringInfo(&json_buf, "\"%s\": " type ", ", #var, calc) + +#define appendLastTcpinfoMember(var, type) \ + appendStringInfo(&json_buf, "\"%s\": " type, #var, tcpi->tcpi_##var) + + /* + * XXX: with INET_DIAG_VEGASINFO the "rtt" could also be also + * taken from struct tcpvegas_info(?) + */ + appendCalculation(rtt, "%.3f", (double) tcpi->tcpi_rtt / 1000.0); + appendCalculation(rttvar, "%.3f", (double) tcpi->tcpi_rttvar / 1000.0); + appendCalculation(rcv_rtt, "%.3f", (double) tcpi->tcpi_rcv_rtt / 1000.0); + appendCalculation(ato, "%.3f", (double) tcpi->tcpi_ato / 1000.0); + appendCalculation(min_rtt, "%.3f", (double) tcpi->tcpi_min_rtt / 1000.0); + + appendTcpinfoMember(snd_cwnd, "%u"); + appendTcpinfoMember(snd_cwnd, "%u"); + appendTcpinfoMember(sndbuf_limited, "%llu"); + appendTcpinfoMember(rwnd_limited, "%llu"); + appendTcpinfoMember(delivery_rate, "%llu"); + appendTcpinfoMember(state, "%u"); + appendTcpinfoMember(ca_state, "%u"); + appendTcpinfoMember(retransmits, "%u"); + appendTcpinfoMember(probes, "%u"); + appendTcpinfoMember(options, "%u"); + appendTcpinfoMember(snd_wscale, "%u"); + appendTcpinfoMember(rcv_wscale, "%u"); + appendTcpinfoMember(delivery_rate_app_limited, "%u"); + appendTcpinfoMember(total_rto, "%u"); + appendTcpinfoMember(total_rto_recoveries, "%u"); + appendTcpinfoMember(rto, "%u"); + appendTcpinfoMember(snd_mss, "%u"); + appendTcpinfoMember(rcv_mss, "%u"); + appendTcpinfoMember(unacked, "%u"); + appendTcpinfoMember(sacked, "%u"); + appendTcpinfoMember(lost, "%u"); + appendTcpinfoMember(retrans, "%u"); + appendTcpinfoMember(fackets, "%u"); + appendTcpinfoMember(last_data_sent, "%u"); + appendTcpinfoMember(last_ack_sent, "%u"); + appendTcpinfoMember(last_data_recv, "%u"); + appendTcpinfoMember(last_ack_recv, "%u"); + appendTcpinfoMember(pmtu, "%u"); + appendTcpinfoMember(rcv_ssthresh, "%u"); + appendTcpinfoMember(snd_ssthresh, "%u"); + appendTcpinfoMember(snd_cwnd, "%u"); + appendTcpinfoMember(advmss, "%u"); + appendTcpinfoMember(reordering, "%u"); + appendTcpinfoMember(rcv_space, "%u"); + appendTcpinfoMember(total_retrans, "%u"); + appendTcpinfoMember(segs_out, "%u"); + appendTcpinfoMember(segs_in, "%u"); + appendTcpinfoMember(notsent_bytes, "%u"); + appendTcpinfoMember(data_segs_out, "%u"); + appendTcpinfoMember(data_segs_in, "%u"); + appendTcpinfoMember(delivered, "%u"); + appendTcpinfoMember(delivered_ce, "%u"); + appendTcpinfoMember(dsack_dups, "%u"); + appendTcpinfoMember(reord_seen, "%u"); + appendTcpinfoMember(rcv_ooopack, "%u"); + appendTcpinfoMember(snd_wnd, "%u"); + appendTcpinfoMember(rcv_wnd, "%u"); + appendTcpinfoMember(rehash, "%u"); + appendTcpinfoMember(total_rto_time, "%u"); + appendTcpinfoMember(pacing_rate, "%llu"); + appendTcpinfoMember(max_pacing_rate, "%llu"); + appendTcpinfoMember(bytes_acked, "%llu"); + appendTcpinfoMember(bytes_received, "%llu"); + appendTcpinfoMember(delivery_rate, "%llu"); + appendTcpinfoMember(busy_time, "%llu"); + appendTcpinfoMember(rwnd_limited, "%llu"); + appendTcpinfoMember(sndbuf_limited, "%llu"); + appendTcpinfoMember(bytes_sent, "%llu"); + appendLastTcpinfoMember(bytes_retrans, "%llu"); + + has_data = true; + } + + /* add detailed TCP buffer sizes as seen by the kerne */ + if (diag_info->has_skmem) + { + if (has_data) + appendStringInfoString(&json_buf, ", "); + skmem = diag_info->skmem; + appendStringInfo(&json_buf, + "\"skmem\": {\"rmem_alloc\": %u, \"rcvbuf\": %u, \"wmem_alloc\": %u, \"sndbuf\": %u, \"fwd_alloc\": %u, \"wmem_queued\": %u, \"optmem\": %u}", + skmem[SK_MEMINFO_RMEM_ALLOC], + skmem[SK_MEMINFO_RCVBUF], + skmem[SK_MEMINFO_WMEM_ALLOC], + skmem[SK_MEMINFO_SNDBUF], + skmem[SK_MEMINFO_FWD_ALLOC], + skmem[SK_MEMINFO_WMEM_QUEUED], + skmem[SK_MEMINFO_OPTMEM]); + + recvq = skmem[SK_MEMINFO_RMEM_ALLOC]; + sendq = skmem[SK_MEMINFO_WMEM_ALLOC]; + + has_data = true; + has_q_data = true; + } + + /* also add TCP congestion used for the connection */ + if (diag_info->has_cong) + { + if (has_data) + appendStringInfoString(&json_buf, ", "); + + /* See info about TCP_CONGESTION in tcp(7). */ + appendStringInfo(&json_buf, "\"congestion\": \"%s\"", diag_info->cong); + + has_data = true; + } + } + appendStringInfoChar(&json_buf, '}'); + + /* Fill the queues: sendq and recvq */ + if (!has_q_data) + { + nulls[i++] = true; + nulls[i++] = true; + } + else + { + values[i++] = Int32GetDatum(recvq); + values[i++] = Int32GetDatum(sendq); + } + + /* Fill in the main tcpinfo JSON coolumn */ + if (!has_data) + nulls[i++] = true; + else + values[i++] = DirectFunctionCall1(jsonb_in, CStringGetDatum(json_buf.data)); + + /* Store the tuple */ + tuplestore_putvalues(tupstore, tupdesc, values, nulls); + + current = current->next; + } + + PG_RETURN_VOID(); +} + +#else + +/* On anything else than Linux. */ +PG_FUNCTION_INFO_V1(pg_stat_get_tcpinfo); +Datum +pg_stat_get_tcpinfo(PG_FUNCTION_ARGS) +{ + elog(ERROR, "pg_stat_tcpinfo is not supported on this platform"); +} +#endif diff --git a/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control new file mode 100644 index 00000000000..0fbf92c095d --- /dev/null +++ b/contrib/pg_stat_tcpinfo/pg_stat_tcpinfo.control @@ -0,0 +1,5 @@ +# tcpinfo.control +comment = 'Provides pg_stat_tcpinfo to show detailed TCP connection info on Linux.' +default_version = '1.0' +module_pathname = '$libdir/pg_stat_tcpinfo' +relocatable = true -- 2.43.0
