This is an automated email from the ASF dual-hosted git repository.

alexey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kudu.git


The following commit(s) were added to refs/heads/master by this push:
     new f15a1ca54 [net] DiagnosticSocket wrapper for sock_diag API
f15a1ca54 is described below

commit f15a1ca5476b256671c2369dd4d9faeccfee6b7c
Author: Alexey Serbin <ale...@apache.org>
AuthorDate: Fri Jan 12 12:26:25 2024 -0800

    [net] DiagnosticSocket wrapper for sock_diag API
    
    This patch introduces DiagnosticSocket wrapper for sock_diag() netlink
    subsystem [1][2].  The primary intention behind DiagnosticSocket is to
    fetch information on the RX queue size for a listening IPv4 TCP socket.
    A follow-up patch will use this new facility to populate a new
    server-level metric: that's to track connection request backlog for a
    Kudu server's RPC socket (a.k.a. listened socket backlog, pending
    connections queue, etc.).
    
    Since netlink is a Linux-specific API/subsystem, the DiagnosticSocket
    API is available only on Linux, correspondingly.
    
    This patch includes a few unit tests to provide basic coverage
    for the newly introduced functionality.
    
    [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
    [2] https://man7.org/linux/man-pages/man7/netlink.7.html
    
    Change-Id: I4a4f37ca4b0df8ca543ec383d89766cbf1b1e526
    Reviewed-on: http://gerrit.cloudera.org:8080/20892
    Tested-by: Alexey Serbin <ale...@apache.org>
    Reviewed-by: Yingchun Lai <laiyingc...@apache.org>
---
 src/kudu/util/CMakeLists.txt                |   5 +
 src/kudu/util/net/diagnostic_socket-test.cc | 165 ++++++++++++++
 src/kudu/util/net/diagnostic_socket.cc      | 328 ++++++++++++++++++++++++++++
 src/kudu/util/net/diagnostic_socket.h       | 121 ++++++++++
 4 files changed, 619 insertions(+)

diff --git a/src/kudu/util/CMakeLists.txt b/src/kudu/util/CMakeLists.txt
index f4d8e5a09..5a4940131 100644
--- a/src/kudu/util/CMakeLists.txt
+++ b/src/kudu/util/CMakeLists.txt
@@ -267,6 +267,10 @@ set(UTIL_SRCS
   zlib.cc
 )
 
+if(NOT APPLE)
+  set(UTIL_SRCS ${UTIL_SRCS} net/diagnostic_socket.cc)
+endif()
+
 if(NOT NO_TESTS)
   set(UTIL_SRCS ${UTIL_SRCS} test_graph.cc)
 endif()
@@ -592,6 +596,7 @@ ADD_KUDU_TEST(yamlreader-test)
 
 if (NOT APPLE)
   ADD_KUDU_TEST(minidump-test)
+  ADD_KUDU_TEST(net/diagnostic_socket-test)
 endif()
 
 #######################################
diff --git a/src/kudu/util/net/diagnostic_socket-test.cc 
b/src/kudu/util/net/diagnostic_socket-test.cc
new file mode 100644
index 000000000..9f192a45f
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/diagnostic_socket.h"
+
+#include <netinet/in.h>
+
+#include <cstddef>
+#include <cstdint>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+#include "kudu/util/status.h"
+#include "kudu/util/test_macros.h"
+#include "kudu/util/test_util.h"
+
+using std::string;
+using std::vector;
+
+namespace kudu {
+
+class DiagnosticSocketTest : public KuduTest {
+ protected:
+  Socket listener_;
+  Sockaddr listen_addr_;
+
+  Status BindAndListen(const string& addr_str, uint16_t port, int 
listen_backlog = 1) {
+    Sockaddr address;
+    RETURN_NOT_OK(address.ParseString(addr_str, port));
+    return BindAndListen(address, listen_backlog);
+  }
+
+  Status BindAndListen(const Sockaddr& address, int listen_backlog) {
+    RETURN_NOT_OK(listener_.Init(address.family(), 0));
+    RETURN_NOT_OK(listener_.BindAndListen(address, listen_backlog));
+    return listener_.GetSocketAddress(&listen_addr_);
+  }
+};
+
+TEST_F(DiagnosticSocketTest, Basic) {
+  DiagnosticSocket ds;
+  // Make sure it's possible to create a netlink socket.
+  ASSERT_OK(ds.Init());
+  // Call Close() on the socket explicitly.
+  ASSERT_OK(ds.Close());
+}
+
+TEST_F(DiagnosticSocketTest, ListeningSocket) {
+  constexpr const char* const kIpAddr = "127.254.254.254";
+  constexpr uint16_t kPort = 56789;
+  constexpr int kListenBacklog = 8;
+
+  ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog));
+
+  DiagnosticSocket ds;
+  ASSERT_OK(ds.Init());
+  DiagnosticSocket::TcpSocketInfo info;
+  ASSERT_OK(ds.Query(listener_, &info));
+
+  // Make sure the result matches the input parameters.
+  ASSERT_EQ(listen_addr_.ipv4_addr().sin_addr.s_addr, info.src_addr);
+  ASSERT_EQ(INADDR_ANY, info.dst_addr);
+  ASSERT_EQ(kPort, ntohs(info.src_port));
+  ASSERT_EQ(0, ntohs(info.dst_port));
+  ASSERT_EQ(DiagnosticSocket::SS_LISTEN, info.state);
+
+  // TX queue size for a listening socket is the size of the backlog queue.
+  ASSERT_EQ(kListenBacklog, info.tx_queue_size);
+
+  // Nothing is connecting to the listen port: no pending connections expected.
+  ASSERT_EQ(0, info.rx_queue_size);
+}
+
+TEST_F(DiagnosticSocketTest, SimplePattern) {
+  // Open a socket, bind and listen, and then close it. This is just to make
+  // sure the socket has valid address, but there is no open socket at the
+  // specified address.
+  constexpr const char* const kIpAddr = "127.254.254.254";
+  constexpr uint16_t kPort = 56789;
+  constexpr int kListenBacklog = 5;
+  ASSERT_OK(BindAndListen(kIpAddr, kPort, kListenBacklog));
+
+  const auto& src_addr = listen_addr_;
+  const auto& dst_addr = Sockaddr::Wildcard();
+  const vector<DiagnosticSocket::SocketState> socket_states{ 
DiagnosticSocket::SS_LISTEN };
+
+  DiagnosticSocket ds;
+  ASSERT_OK(ds.Init());
+
+  // Use a pattern to match only the listened server socket.
+  {
+    vector<DiagnosticSocket::TcpSocketInfo> info;
+    // The query should return success.
+    ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info));
+    ASSERT_EQ(1, info.size());
+    const auto& entry = info.front();
+
+    // Make sure the result matches the input parameters.
+    ASSERT_EQ(src_addr.ipv4_addr().sin_addr.s_addr, entry.src_addr);
+    ASSERT_EQ(INADDR_ANY, entry.dst_addr);
+    ASSERT_EQ(kPort, ntohs(entry.src_port));
+    ASSERT_EQ(0, ntohs(entry.dst_port));
+    ASSERT_EQ(DiagnosticSocket::SS_LISTEN, entry.state);
+
+    // Verify the expected statistics on the server socket.
+    ASSERT_EQ(0, entry.rx_queue_size); // no pending connections
+    ASSERT_EQ(kListenBacklog, entry.tx_queue_size);
+  }
+
+  // Use a pattern to match any IPv4 TCP socket.
+  {
+    const auto& addr_wildcard = Sockaddr::Wildcard();
+    const auto& state_wildcard = DiagnosticSocket::SocketStateWildcard();
+    vector<DiagnosticSocket::TcpSocketInfo> info;
+    // The query should return success.
+    ASSERT_OK(ds.Query(addr_wildcard, addr_wildcard, state_wildcard, &info));
+    ASSERT_GE(info.size(), 1);
+
+    // Make sure the server's socket is one of the reported ones.
+    size_t matched_entries = 0;
+    for (const auto& entry : info) {
+      if (src_addr.ipv4_addr().sin_addr.s_addr != entry.src_addr ||
+          INADDR_ANY != entry.dst_addr ||
+          kPort != ntohs(entry.src_port) ||
+          0 != ntohs(entry.dst_port) ||
+          DiagnosticSocket::SS_LISTEN != entry.state) {
+        continue;
+      }
+      ++matched_entries;
+    }
+    ASSERT_EQ(1, matched_entries);
+  }
+
+  // Close the socket; the socket's address in listen_addr_ still valid.
+  ASSERT_OK(listener_.Close());
+
+  {
+    vector<DiagnosticSocket::TcpSocketInfo> info;
+    // The query should return success.
+    ASSERT_OK(ds.Query(src_addr, dst_addr, socket_states, &info));
+    // However, the list of matching sockets should be empty since the socket
+    // that could match the pattern has been just closed.
+    ASSERT_TRUE(info.empty());
+  }
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/net/diagnostic_socket.cc 
b/src/kudu/util/net/diagnostic_socket.cc
new file mode 100644
index 000000000..6a2f6a7a6
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket.cc
@@ -0,0 +1,328 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "kudu/util/net/diagnostic_socket.h"
+
+#include <linux/inet_diag.h>
+#include <linux/netlink.h>
+#include <linux/sock_diag.h>
+#include <linux/types.h>
+
+#include <netinet/in.h>
+#include <sys/socket.h>
+#include <unistd.h>
+
+#include <cerrno>
+#include <ostream>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include <glog/logging.h>
+
+#include "kudu/gutil/port.h"
+#include "kudu/gutil/strings/substitute.h"
+#include "kudu/util/errno.h"
+#include "kudu/util/net/sockaddr.h"
+#include "kudu/util/net/socket.h"
+
+using std::string;
+using std::vector;
+using strings::Substitute;
+
+namespace kudu {
+
+const vector<DiagnosticSocket::SocketState>& 
DiagnosticSocket::SocketStateWildcard() {
+  static const vector<DiagnosticSocket::SocketState> kSocketStateWildcard {
+    SS_ESTABLISHED,
+    SS_SYN_SENT,
+    SS_SYN_RECV,
+    SS_FIN_WAIT1,
+    SS_FIN_WAIT2,
+    SS_TIME_WAIT,
+    SS_CLOSE,
+    SS_CLOSE_WAIT,
+    SS_LAST_ACK,
+    SS_LISTEN,
+    SS_CLOSING,
+  };
+  return kSocketStateWildcard;
+}
+
+DiagnosticSocket::DiagnosticSocket()
+    : fd_(-1) {
+}
+
+DiagnosticSocket::~DiagnosticSocket() {
+  WARN_NOT_OK(Close(), "errors on closing diagnostic socket");
+}
+
+Status DiagnosticSocket::Init() {
+  auto fd = ::socket(AF_NETLINK, SOCK_RAW | SOCK_CLOEXEC, NETLINK_SOCK_DIAG);
+  if (fd < 0) {
+    int err = errno;
+    return Status::RuntimeError("unable to open diagnostic socket",
+                                ErrnoToString(err), err);
+  }
+  fd_ = fd;
+
+  return Status::OK();
+}
+
+Status DiagnosticSocket::Close() {
+  if (fd_ < 0) {
+    return Status::OK();
+  }
+  int ret;
+  RETRY_ON_EINTR(ret, ::close(fd_));
+  if (ret < 0) {
+    int err = errno;
+    return Status::IOError("close error", ErrnoToString(err), err);
+  }
+  fd_ = -1;
+  return Status::OK();
+}
+
+Status DiagnosticSocket::Query(const Sockaddr& socket_src_addr,
+                               const Sockaddr& socket_dst_addr,
+                               const vector<SocketState>& socket_states,
+                               vector<TcpSocketInfo>* info) {
+  DCHECK_GE(fd_, 0) << "requires calling Init() first";
+  DCHECK(info);
+
+  uint32_t socket_states_bitmask = 0;
+  for (auto state : socket_states) {
+    socket_states_bitmask |= (1U << state);
+  }
+
+  RETURN_NOT_OK(SendRequest(
+      socket_src_addr, socket_dst_addr, socket_states_bitmask));
+  vector<TcpSocketInfo> result;
+  RETURN_NOT_OK(ReceiveResponse(&result));
+  *info = std::move(result);
+  return Status::OK();
+}
+
+Status DiagnosticSocket::Query(const Socket& socket, TcpSocketInfo* info) {
+  DCHECK_GE(fd_, 0) << "requires calling Init() first";
+  DCHECK(info);
+
+  RETURN_NOT_OK(SendRequest(socket));
+  vector<TcpSocketInfo> result;
+  RETURN_NOT_OK(ReceiveResponse(&result));
+  if (result.empty()) {
+    return Status::NotFound("no matching IPv4 TCP socket found");
+  }
+  if (PREDICT_FALSE(result.size() > 1)) {
+    return Status::InvalidArgument("socket address is ambiguous");
+  }
+
+  *info = result.front();
+  return Status::OK();
+}
+
+// Send query about the specified socket.
+Status DiagnosticSocket::SendRequest(const Socket& socket) const {
+  DCHECK_GE(fd_, 0);
+
+  static constexpr const char* const kNonIpErrMsg =
+      "netlink diagnostics is currently supported only on IPv4 TCP sockets";
+
+  Sockaddr src_addr;
+  RETURN_NOT_OK(socket.GetSocketAddress(&src_addr));
+  if (PREDICT_FALSE(!src_addr.is_ip())) {
+    return Status::NotSupported(kNonIpErrMsg);
+  }
+
+  Sockaddr dst_addr;
+  auto s = socket.GetPeerAddress(&dst_addr);
+  if (s.ok()) {
+    if (PREDICT_FALSE(!dst_addr.is_ip())) {
+      return Status::NotSupported(kNonIpErrMsg);
+    }
+  } else {
+    if (PREDICT_TRUE(s.IsNetworkError() && s.posix_code() == ENOTCONN)) {
+      // Assuming it's a listened socket if there isn't a peer at the other 
side.
+      dst_addr = Sockaddr::Wildcard();
+    } else {
+      return s;
+    }
+  }
+
+  const uint32_t socket_state_bitmask =
+      dst_addr.IsWildcard() ? (1U << SS_LISTEN) : (1U << SS_ESTABLISHED);
+  return SendRequest(src_addr, dst_addr, socket_state_bitmask);
+}
+
+Status DiagnosticSocket::SendRequest(const Sockaddr& socket_src_addr,
+                                     const Sockaddr& socket_dst_addr,
+                                     uint32_t socket_states_bitmask) const {
+  DCHECK_GE(fd_, 0);
+
+  const in_addr& src_ipv4 = socket_src_addr.ipv4_addr().sin_addr;
+  const auto src_port = socket_src_addr.port();
+  const in_addr& dst_ipv4 = socket_dst_addr.ipv4_addr().sin_addr;
+  const auto dst_port = socket_dst_addr.port();
+
+  constexpr uint32_t kWildcard = static_cast<uint32_t>(-1);
+  // All values in inet_diag_sockid are in network byte order.
+  const struct inet_diag_sockid sock_id = {
+    .idiag_sport = htons(src_port),
+    .idiag_dport = htons(dst_port),
+    .idiag_src = { src_ipv4.s_addr, 0, 0, 0, },
+    .idiag_dst = { dst_ipv4.s_addr, 0, 0, 0, },
+    .idiag_if = kWildcard,
+    .idiag_cookie = { kWildcard, kWildcard },
+  };
+
+  struct TcpSocketRequest {
+    struct nlmsghdr nlh;
+    struct inet_diag_req_v2 idr;
+  } req = {
+    .nlh = {
+      .nlmsg_len = sizeof(req),
+      .nlmsg_type = SOCK_DIAG_BY_FAMILY,
+      .nlmsg_flags = NLM_F_REQUEST | NLM_F_MATCH,
+    },
+    .idr = {
+      .sdiag_family = AF_INET,
+      .sdiag_protocol = IPPROTO_TCP,
+      .idiag_ext = INET_DIAG_MEMINFO,
+      .pad = 0,
+      .idiag_states = socket_states_bitmask,
+      .id = sock_id,
+    }
+  };
+
+  struct iovec iov = {
+    .iov_base = &req,
+    .iov_len = sizeof(req),
+  };
+  struct sockaddr_nl nladdr = {
+    .nl_family = AF_NETLINK
+  };
+  struct msghdr msg = {
+    .msg_name = &nladdr,
+    .msg_namelen = sizeof(nladdr),
+    .msg_iov = &iov,
+    .msg_iovlen = 1,
+  };
+
+  int rc = -1;
+  RETRY_ON_EINTR(rc, ::sendmsg(fd_, &msg, 0));
+  if (rc < 0) {
+    int err = errno;
+    return Status::NetworkError("semdmsg() failed", ErrnoToString(err), err);
+  }
+  return Status::OK();
+}
+
+Status DiagnosticSocket::ReceiveResponse(vector<TcpSocketInfo>* result) const {
+  DCHECK_GE(fd_, 0);
+
+  uint8_t buf[8192];
+  struct iovec iov = {
+    .iov_base = buf,
+    .iov_len = sizeof(buf)
+  };
+
+  while (true) {
+    struct sockaddr_nl nladdr = {};
+    struct msghdr msg = {
+      .msg_name = &nladdr,
+      .msg_namelen = sizeof(nladdr),
+      .msg_iov = &iov,
+      .msg_iovlen = 1
+    };
+
+    ssize_t ret = -1;
+    RETRY_ON_EINTR(ret, ::recvmsg(fd_, &msg, 0));
+    if (PREDICT_FALSE(ret < 0)) {
+      int err = errno;
+      return Status::IOError("recvmsg()", ErrnoToString(err), err);
+    }
+    if (ret == 0) {
+      // End of stream.
+      return Status::OK();
+    }
+
+    const struct nlmsghdr* h = reinterpret_cast<const struct nlmsghdr*>(buf);
+    if (PREDICT_FALSE(!NLMSG_OK(h, ret))) {
+      return Status::Corruption(
+          Substitute("unexpected netlink response size $0", ret));
+    }
+
+    if (PREDICT_FALSE(nladdr.nl_family != AF_NETLINK)) {
+      return Status::Corruption(Substitute(
+          "$0: unexpected address family", 
static_cast<uint32_t>(nladdr.nl_family)));
+    }
+
+    for (; NLMSG_OK(h, ret); h = NLMSG_NEXT(h, ret)) {
+      if (h->nlmsg_type == NLMSG_DONE) {
+        return Status::OK();
+      }
+      if (PREDICT_FALSE(h->nlmsg_type == NLMSG_ERROR)) {
+        // Below, the NLMSG_DATA(h) macro is expanded and C-style casts 
replaced
+        // with reinterpret_cast<>.
+        const struct nlmsgerr* errdata = reinterpret_cast<const struct 
nlmsgerr*>(
+            reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0));
+        if (PREDICT_FALSE(h->nlmsg_len < NLMSG_LENGTH(sizeof(*errdata)))) {
+          return Status::Corruption("NLMSG error message is too short");
+        }
+        const int err = -errdata->error;
+        return Status::RuntimeError("netlink error", ErrnoToString(err), err);
+      }
+
+      if (PREDICT_FALSE(h->nlmsg_type != SOCK_DIAG_BY_FAMILY)) {
+        return Status::Corruption(Substitute("$0: unexpected netlink message 
type",
+                                             
static_cast<uint32_t>(h->nlmsg_type)));
+      }
+
+      // Below, the NLMSG_DATA(h) macro is expanded and C-style casts replaced
+      // with reinterpret_cast<>.
+      const struct inet_diag_msg* msg_data = reinterpret_cast<const struct 
inet_diag_msg*>(
+          reinterpret_cast<const uint8_t*>(h) + NLMSG_LENGTH(0));
+      const uint32_t msg_size = h->nlmsg_len;
+      if (PREDICT_FALSE(msg_size < NLMSG_LENGTH(sizeof(*msg_data)))) {
+        return Status::Corruption(Substitute(
+            "$0: netlink response is too short", msg_size));
+      }
+      // Only IPv4 addresses are expected due to the query pattern.
+      if (PREDICT_FALSE(msg_data->idiag_family != AF_INET)) {
+        return Status::Corruption(Substitute(
+            "$0: unexpected address family in netlink response",
+            static_cast<uint32_t>(msg_data->idiag_family)));
+      }
+
+      DCHECK_LE(SocketState::SS_ESTABLISHED, msg_data->idiag_state);
+      DCHECK_GE(SocketState::SS_CLOSING, msg_data->idiag_state);
+
+      TcpSocketInfo info;
+      info.state = static_cast<SocketState>(msg_data->idiag_state);
+      info.src_addr = msg_data->id.idiag_src[0];  // IPv4 address, network 
byte order
+      info.dst_addr = msg_data->id.idiag_dst[0];  // IPv4 address, network 
byte order
+      info.src_port = msg_data->id.idiag_sport;
+      info.dst_port = msg_data->id.idiag_dport;
+      info.rx_queue_size = msg_data->idiag_rqueue;
+      info.tx_queue_size = msg_data->idiag_wqueue;
+      result->emplace_back(info);
+    }
+  }
+  return Status::OK();
+}
+
+} // namespace kudu
diff --git a/src/kudu/util/net/diagnostic_socket.h 
b/src/kudu/util/net/diagnostic_socket.h
new file mode 100644
index 000000000..bee3ba1b1
--- /dev/null
+++ b/src/kudu/util/net/diagnostic_socket.h
@@ -0,0 +1,121 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <cstdint>
+#include <vector>
+
+#include "kudu/gutil/macros.h"
+#include "kudu/gutil/port.h"
+#include "kudu/util/status.h"
+
+namespace kudu {
+
+class Sockaddr;
+class Socket;
+
+// A wrapper around Linux-specific sock_diag() API [1] based on the
+// netlink facility [2] to fetch information on IPv4 TCP sockets.
+//
+// [1] https://man7.org/linux/man-pages/man7/sock_diag.7.html
+// [2] https://man7.org/linux/man-pages/man7/netlink.7.html
+class DiagnosticSocket final {
+ public:
+  // Enum for the socket state. This is modeled after the corresponding
+  // TCP_-prefixed enum in /usr/include/netinet/tcp.h with exact value mapping.
+  // This enum is introduced to decouple the netinet/tcp.h header and the API
+  // of this class.
+  enum SocketState {
+    SS_UNKNOWN = 0,
+    SS_ESTABLISHED,
+    SS_SYN_SENT,
+    SS_SYN_RECV,
+    SS_FIN_WAIT1,
+    SS_FIN_WAIT2,
+    SS_TIME_WAIT,
+    SS_CLOSE,
+    SS_CLOSE_WAIT,
+    SS_LAST_ACK,
+    SS_LISTEN,
+    SS_CLOSING,
+    SS_MAX,
+  };
+
+  // Diagnostic information on a TCP IPv4 socket. That's a subset of the
+  // information available via the netlink data structures.
+  //
+  // TODO(aserbin): if using this API more broadly than fetching information on
+  //                a single socket, consider replacing { addr, port } pairs 
for
+  //                the source and the destination with Sockaddr class fields.
+  struct TcpSocketInfo {
+    SocketState state;      // current state of the socket
+    uint32_t src_addr;      // IPv4 source address (network byte order)
+    uint32_t dst_addr;      // IPv4 destination address (network byte order)
+    uint16_t src_port;      // source port number (network byte order)
+    uint16_t dst_port;      // destination port number (network byte order)
+    uint32_t rx_queue_size; // RX queue size
+    uint32_t tx_queue_size; // TX queue size
+  };
+
+  // Return wildcard for all the available socket states.
+  static const std::vector<SocketState>& SocketStateWildcard();
+
+  // Construct an object.
+  DiagnosticSocket();
+
+  // Close the diagnostic socket. Errors will be logged, but ignored.
+  ~DiagnosticSocket();
+
+  // Open the diagnostic socket of the NETLINK_SOCK_DIAG protocol in the
+  // AF_NETLINK domain, so it's possible to fetch the requested information
+  // from the kernel using the netlink facility via the API of this class.
+  Status Init() WARN_UNUSED_RESULT;
+
+  // Close the Socket, checking for errors.
+  Status Close();
+
+  // Get diagnostic information on IPv4 TCP sockets of the specified states
+  // having the specified source and the destination address. Wildcard 
addresses
+  // are supported.
+  Status Query(const Sockaddr& socket_src_addr,
+               const Sockaddr& socket_dst_addr,
+               const std::vector<SocketState>& socket_states,
+               std::vector<TcpSocketInfo>* info);
+
+  // Get diagnostic information on the specified socket. This is a handy
+  // shortcut to the Query() method above for a single active socket in the
+  // SS_ESTABLISHED or SS_LISTEN.
+  Status Query(const Socket& socket, TcpSocketInfo* info);
+
+ private:
+  // Build and send netlink request, writing it into the diagnostic socket.
+  Status SendRequest(const Socket& socket) const;
+  Status SendRequest(const Sockaddr& socket_src_addr,
+                     const Sockaddr& socket_dst_addr,
+                     uint32_t socket_states_bitmask) const;
+
+  // Receive response for a request sent by a method above.
+  Status ReceiveResponse(std::vector<TcpSocketInfo>* result) const;
+
+  // File descriptor of the diagnostic socket (AF_NETLINK domain).
+  int fd_;
+
+  DISALLOW_COPY_AND_ASSIGN(DiagnosticSocket);
+};
+
+} // namespace kudu

Reply via email to