# HG changeset patch
# User Roman Arutyunyan <a...@nginx.com>
# Date 1709833123 -28800
#      Fri Mar 08 01:38:43 2024 +0800
# Node ID 5d28510b62bffba3187d7fe69baccd2d2da41a12
# Parent  2ed3f57dca0a664340bca2236c7d614902db4180
QUIC: output packet batching with sendmmsg().

diff --git a/auto/os/linux b/auto/os/linux
--- a/auto/os/linux
+++ b/auto/os/linux
@@ -291,4 +291,16 @@ ngx_feature_test="socklen_t optlen = siz
 . auto/feature
 
 
+ngx_feature="sendmmsg()"
+ngx_feature_name="NGX_HAVE_SENDMMSG"
+ngx_feature_run=no
+ngx_feature_incs="#include <sys/socket.h>
+                  #include <sys/uio.h>"
+ngx_feature_path=
+ngx_feature_libs=
+ngx_feature_test="struct mmsghdr msg[UIO_MAXIOV];
+                  sendmmsg(0, msg, UIO_MAXIOV, 0);"
+. auto/feature
+
+
 CC_AUX_FLAGS="$cc_aux_flags -D_GNU_SOURCE -D_FILE_OFFSET_BITS=64"
diff --git a/auto/sources b/auto/sources
--- a/auto/sources
+++ b/auto/sources
@@ -171,6 +171,7 @@ UNIX_SRCS="$CORE_SRCS $EVENT_SRCS \
             src/os/unix/ngx_writev_chain.c \
             src/os/unix/ngx_udp_send.c \
             src/os/unix/ngx_udp_sendmsg_chain.c \
+            src/os/unix/ngx_udp_sendmmsg.c \
             src/os/unix/ngx_channel.c \
             src/os/unix/ngx_shmem.c \
             src/os/unix/ngx_process.c \
diff --git a/src/event/ngx_event.c b/src/event/ngx_event.c
--- a/src/event/ngx_event.c
+++ b/src/event/ngx_event.c
@@ -823,9 +823,11 @@ ngx_event_process_init(ngx_cycle_t *cycl
         ls[i].connection = c;
 
         rev = c->read;
+        wev = c->write;
 
         rev->log = c->log;
         rev->accept = 1;
+        wev->log = c->log;
 
 #if (NGX_HAVE_DEFERRED_ACCEPT)
         rev->deferred_accept = ls[i].deferred_accept;
@@ -897,6 +899,14 @@ ngx_event_process_init(ngx_cycle_t *cycl
 #if (NGX_QUIC)
         } else if (ls[i].quic) {
             rev->handler = ngx_quic_recvmsg;
+#if (NGX_HAVE_SENDMMSG)
+            wev->handler = ngx_event_sendmmsg;
+
+            c->data = ngx_pcalloc(cycle->pool, sizeof(ngx_sendmmsg_batch_t));
+            if (c->data == NULL) {
+                return NGX_ERROR;
+            }
+#endif
 #endif
         } else {
             rev->handler = ngx_event_recvmsg;
diff --git a/src/event/ngx_event_udp.h b/src/event/ngx_event_udp.h
--- a/src/event/ngx_event_udp.h
+++ b/src/event/ngx_event_udp.h
@@ -22,6 +22,8 @@
 
 #endif
 
+#define NGX_SENDMMSG_BUFFER  4194304 /* 4M */
+
 
 struct ngx_udp_connection_s {
     ngx_rbtree_node_t   node;
@@ -47,6 +49,23 @@ typedef union {
 #endif
 } ngx_addrinfo_t;
 
+
+#if (NGX_HAVE_SENDMMSG)
+
+typedef struct {
+    u_char                buffer[NGX_SENDMMSG_BUFFER];
+    struct mmsghdr        msgvec[UIO_MAXIOV];
+    size_t                size;
+    ngx_uint_t            vlen;
+} ngx_sendmmsg_batch_t;
+
+
+ssize_t ngx_sendmmsg(ngx_connection_t *c, struct msghdr *msg, int flags);
+u_char *ngx_sendmmsg_buffer(ngx_connection_t *c, size_t size);
+void ngx_event_sendmmsg(ngx_event_t *ev);
+
+#endif
+
 size_t ngx_set_srcaddr_cmsg(struct cmsghdr *cmsg,
     struct sockaddr *local_sockaddr);
 ngx_int_t ngx_get_srcaddr_cmsg(struct cmsghdr *cmsg,
diff --git a/src/event/quic/ngx_event_quic_migration.c 
b/src/event/quic/ngx_event_quic_migration.c
--- a/src/event/quic/ngx_event_quic_migration.c
+++ b/src/event/quic/ngx_event_quic_migration.c
@@ -908,10 +908,12 @@ ngx_quic_expire_path_mtu_discovery(ngx_c
 static ngx_int_t
 ngx_quic_send_path_mtu_probe(ngx_connection_t *c, ngx_quic_path_t *path)
 {
+    void                   *bt;
     size_t                  mtu;
     uint64_t                pnum;
     ngx_int_t               rc;
     ngx_uint_t              log_error;
+    ngx_connection_t       *lc;
     ngx_quic_frame_t       *frame;
     ngx_quic_send_ctx_t    *ctx;
     ngx_quic_connection_t  *qc;
@@ -933,6 +935,10 @@ ngx_quic_send_path_mtu_probe(ngx_connect
                    "mtu:%uz pnum:%uL tries:%ui",
                    path->seqnum, path->mtud, ctx->pnum, path->tries);
 
+    lc = c->listening->connection;
+    bt = lc->data;
+    lc->data = NULL;
+
     log_error = c->log_error;
     c->log_error = NGX_ERROR_IGNORE_EMSGSIZE;
 
@@ -943,6 +949,7 @@ ngx_quic_send_path_mtu_probe(ngx_connect
 
     path->mtu = mtu;
     c->log_error = log_error;
+    lc->data = bt;
 
     if (rc == NGX_OK) {
         path->mtu_pnum[path->tries] = pnum;
diff --git a/src/event/quic/ngx_event_quic_output.c 
b/src/event/quic/ngx_event_quic_output.c
--- a/src/event/quic/ngx_event_quic_output.c
+++ b/src/event/quic/ngx_event_quic_output.c
@@ -61,6 +61,7 @@ static void ngx_quic_init_packet(ngx_con
 static ngx_uint_t ngx_quic_get_padding_level(ngx_connection_t *c);
 static ssize_t ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len,
     struct sockaddr *sockaddr, socklen_t socklen);
+static u_char *ngx_quic_send_buffer(ngx_connection_t *c);
 static void ngx_quic_set_packet_number(ngx_quic_header_t *pkt,
     ngx_quic_send_ctx_t *ctx);
 
@@ -114,14 +115,13 @@ ngx_quic_create_datagrams(ngx_connection
 {
     size_t                  len, min;
     ssize_t                 n;
-    u_char                 *p;
+    u_char                 *dst, *p;
     uint64_t                preserved_pnum[NGX_QUIC_SEND_CTX_LAST];
     ngx_uint_t              i, pad;
     ngx_quic_path_t        *path;
     ngx_quic_send_ctx_t    *ctx;
     ngx_quic_congestion_t  *cg;
     ngx_quic_connection_t  *qc;
-    static u_char           dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
 
     qc = ngx_quic_get_connection(c);
     cg = &qc->congestion;
@@ -129,6 +129,7 @@ ngx_quic_create_datagrams(ngx_connection
 
     while (cg->in_flight < cg->window) {
 
+        dst = ngx_quic_send_buffer(c);
         p = dst;
 
         len = ngx_quic_path_limit(c, path, path->mtu);
@@ -688,12 +689,12 @@ static ssize_t
 ngx_quic_send(ngx_connection_t *c, u_char *buf, size_t len,
     struct sockaddr *sockaddr, socklen_t socklen)
 {
-    ssize_t          n;
-    struct iovec     iov;
-    struct msghdr    msg;
+    ssize_t            n;
+    struct iovec       iov;
+    struct msghdr      msg;
 #if (NGX_HAVE_ADDRINFO_CMSG)
-    struct cmsghdr  *cmsg;
-    char             msg_control[CMSG_SPACE(sizeof(ngx_addrinfo_t))];
+    struct cmsghdr    *cmsg;
+    char               msg_control[CMSG_SPACE(sizeof(ngx_addrinfo_t))];
 #endif
 
     ngx_memzero(&msg, sizeof(struct msghdr));
@@ -720,7 +721,11 @@ ngx_quic_send(ngx_connection_t *c, u_cha
     }
 #endif
 
+#if (NGX_HAVE_SENDMMSG)
+    n = ngx_sendmmsg(c, &msg, 0);
+#else
     n = ngx_sendmsg(c, &msg, 0);
+#endif
     if (n < 0) {
         return n;
     }
@@ -731,6 +736,26 @@ ngx_quic_send(ngx_connection_t *c, u_cha
 }
 
 
+static u_char *
+ngx_quic_send_buffer(ngx_connection_t *c)
+{
+    static u_char   buffer[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+
+#if (NGX_HAVE_SENDMMSG)
+
+    u_char         *p;
+
+    p = ngx_sendmmsg_buffer(c, NGX_QUIC_MAX_UDP_PAYLOAD_SIZE);
+    if (p) {
+        return p;
+    }
+
+#endif
+
+    return buffer;
+}
+
+
 static void
 ngx_quic_set_packet_number(ngx_quic_header_t *pkt, ngx_quic_send_ctx_t *ctx)
 {
@@ -764,9 +789,9 @@ ngx_quic_set_packet_number(ngx_quic_head
 ngx_int_t
 ngx_quic_negotiate_version(ngx_connection_t *c, ngx_quic_header_t *inpkt)
 {
-    size_t             len;
-    ngx_quic_header_t  pkt;
-    static u_char      buf[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+    u_char             *buf;
+    size_t              len;
+    ngx_quic_header_t   pkt;
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "sending version negotiation packet");
@@ -776,6 +801,8 @@ ngx_quic_negotiate_version(ngx_connectio
     pkt.dcid = inpkt->scid;
     pkt.scid = inpkt->dcid;
 
+    buf = ngx_quic_send_buffer(c);
+
     len = ngx_quic_create_version_negotiation(&pkt, buf);
 
 #ifdef NGX_QUIC_DEBUG_PACKETS
@@ -793,10 +820,9 @@ ngx_int_t
 ngx_quic_send_stateless_reset(ngx_connection_t *c, ngx_quic_conf_t *conf,
     ngx_quic_header_t *pkt)
 {
-    u_char    *token;
+    u_char    *token, *buf;
     size_t     len, max;
     uint16_t   rndbytes;
-    u_char     buf[NGX_QUIC_MAX_SR_PACKET];
 
     ngx_log_debug0(NGX_LOG_DEBUG_EVENT, c->log, 0,
                    "quic handle stateless reset output");
@@ -819,6 +845,8 @@ ngx_quic_send_stateless_reset(ngx_connec
               + NGX_QUIC_MIN_SR_PACKET;
     }
 
+    buf = ngx_quic_send_buffer(c);
+
     if (RAND_bytes(buf, len - NGX_QUIC_SR_TOKEN_LEN) != 1) {
         return NGX_ERROR;
     }
@@ -892,9 +920,7 @@ ngx_quic_send_early_cc(ngx_connection_t 
     ngx_quic_keys_t    keys;
     ngx_quic_frame_t   frame;
     ngx_quic_header_t  pkt;
-
-    static u_char       src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
-    static u_char       dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
+    static u_char      src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
 
     ngx_memzero(&frame, sizeof(ngx_quic_frame_t));
     ngx_memzero(&pkt, sizeof(ngx_quic_header_t));
@@ -945,7 +971,7 @@ ngx_quic_send_early_cc(ngx_connection_t 
     pkt.payload.data = src;
     pkt.payload.len = len;
 
-    res.data = dst;
+    res.data = ngx_quic_send_buffer(c);
 
     ngx_quic_log_packet(c->log, &pkt);
 
@@ -974,7 +1000,6 @@ ngx_quic_send_retry(ngx_connection_t *c,
     ngx_str_t          res, token;
     ngx_quic_header_t  pkt;
 
-    u_char             buf[NGX_QUIC_RETRY_BUFFER_SIZE];
     u_char             dcid[NGX_QUIC_SERVER_CID_LEN];
     u_char             tbuf[NGX_QUIC_TOKEN_BUF_SIZE];
 
@@ -1008,7 +1033,7 @@ ngx_quic_send_retry(ngx_connection_t *c,
 
     pkt.token = token;
 
-    res.data = buf;
+    res.data = ngx_quic_send_buffer(c);
 
     if (ngx_quic_encrypt(&pkt, &res) != NGX_OK) {
         return NGX_ERROR;
@@ -1193,9 +1218,7 @@ ngx_quic_frame_sendto(ngx_connection_t *
     ngx_quic_send_ctx_t    *ctx;
     ngx_quic_congestion_t  *cg;
     ngx_quic_connection_t  *qc;
-
     static u_char           src[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
-    static u_char           dst[NGX_QUIC_MAX_UDP_PAYLOAD_SIZE];
 
     qc = ngx_quic_get_connection(c);
     cg = &qc->congestion;
@@ -1254,7 +1277,7 @@ ngx_quic_frame_sendto(ngx_connection_t *
     pkt.payload.data = src;
     pkt.payload.len = len;
 
-    res.data = dst;
+    res.data = ngx_quic_send_buffer(c);
 
     ngx_quic_log_packet(c->log, &pkt);
 
diff --git a/src/os/unix/ngx_udp_sendmmsg.c b/src/os/unix/ngx_udp_sendmmsg.c
new file mode 100644
--- /dev/null
+++ b/src/os/unix/ngx_udp_sendmmsg.c
@@ -0,0 +1,185 @@
+
+/*
+ * Copyright (C) Roman Arutyunyan
+ * Copyright (C) Nginx, Inc.
+ */
+
+
+#include <ngx_config.h>
+#include <ngx_core.h>
+#include <ngx_event.h>
+
+
+#if (NGX_HAVE_SENDMMSG)
+
+static void ngx_sendmmsg_flush(ngx_connection_t *lc);
+
+
+ssize_t
+ngx_sendmmsg(ngx_connection_t *c, struct msghdr *msg, int flags)
+{
+    u_char                *p;
+    size_t                 size, aux_size;
+    ngx_uint_t             i;
+    struct iovec          *iov;
+    struct msghdr         *nmsg;
+    ngx_connection_t      *lc;
+    ngx_sendmmsg_batch_t  *sb;
+
+    if (c->listening == NULL || c->listening->connection->data == NULL) {
+        return ngx_sendmsg(c, msg, flags);
+    }
+
+    lc = c->listening->connection;
+    sb = lc->data;
+
+    for (i = 0, size = 0; i < msg->msg_iovlen; i++) {
+        size += msg->msg_iov[i].iov_len;
+    }
+
+    c->sent += size;
+
+    nmsg = &sb->msgvec[sb->vlen++].msg_hdr;
+
+    aux_size = NGX_ALIGNMENT + sizeof(struct iovec)
+               + NGX_ALIGNMENT + msg->msg_namelen
+               + NGX_ALIGNMENT + msg->msg_controllen;
+
+    if (sb->size + size + aux_size > NGX_SENDMMSG_BUFFER) {
+        *nmsg = *msg;
+        goto flush;
+    }
+
+    ngx_memzero(nmsg, sizeof(struct msghdr));
+
+    p = sb->buffer + sb->size;
+
+    for (i = 0; i < msg->msg_iovlen; i++) {
+        if (msg->msg_iov[i].iov_base != p) {
+            ngx_memcpy(p, msg->msg_iov[i].iov_base, msg->msg_iov[i].iov_len);
+        }
+
+        p += msg->msg_iov[i].iov_len;
+    }
+
+    p = ngx_align_ptr(p, NGX_ALIGNMENT);
+    iov = (struct iovec *) p;
+    nmsg->msg_iov = iov;
+    nmsg->msg_iovlen = 1;
+    iov->iov_base = sb->buffer + sb->size;
+    iov->iov_len = size;
+    p += sizeof(struct iovec);
+
+    p = ngx_align_ptr(p, NGX_ALIGNMENT);
+    nmsg->msg_name = p;
+    nmsg->msg_namelen = msg->msg_namelen;
+    p = ngx_cpymem(p, msg->msg_name, msg->msg_namelen);
+
+    if (msg->msg_controllen) {
+        p = ngx_align_ptr(p, NGX_ALIGNMENT);
+        nmsg->msg_control = p;
+        nmsg->msg_controllen = msg->msg_controllen;
+        p = ngx_cpymem(p, msg->msg_control, msg->msg_controllen);
+    }
+
+    sb->size = p - sb->buffer;
+
+    ngx_log_debug4(NGX_LOG_DEBUG_EVENT, c->log, 0,
+                   "sendmmsg batch n:%ui s:%uz a:%uz t:%uz",
+                   sb->vlen, size, aux_size, sb->size);
+
+    if (sb->vlen == UIO_MAXIOV) {
+        goto flush;
+    }
+
+    ngx_post_event(lc->write, &ngx_posted_events);
+
+    return size;
+
+flush:
+
+    ngx_sendmmsg_flush(lc);
+
+    return size;
+}
+
+
+static void
+ngx_sendmmsg_flush(ngx_connection_t *lc)
+{
+    int                    n;
+    ngx_err_t              err;
+    ngx_sendmmsg_batch_t  *sb;
+
+    sb = lc->data;
+
+    if (sb == NULL || sb->vlen == 0) {
+        return;
+    }
+
+    n = sendmmsg(lc->fd, sb->msgvec, sb->vlen, 0);
+
+    ngx_log_debug3(NGX_LOG_DEBUG_EVENT, lc->log, 0,
+                   "sendmmsg: %d of %ui s:%uz", n, sb->vlen, sb->size);
+
+    if (n == -1) {
+        err = ngx_errno;
+
+        switch (err) {
+        case NGX_EAGAIN:
+            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, lc->log, err,
+                           "sendmmsg() not ready");
+            break;
+
+        case NGX_EINTR:
+            ngx_log_debug0(NGX_LOG_DEBUG_EVENT, lc->log, err,
+                           "sendmmsg() was interrupted");
+            break;
+
+        default:
+            ngx_connection_error(lc, err, "sendmmsg() failed");
+            break;
+        }
+    }
+
+    sb->size = 0;
+    sb->vlen = 0;
+}
+
+
+void
+ngx_event_sendmmsg(ngx_event_t *ev)
+{
+    ngx_connection_t  *lc;
+
+    lc = ev->data;
+
+    ngx_sendmmsg_flush(lc);
+}
+
+
+u_char *
+ngx_sendmmsg_buffer(ngx_connection_t *c, size_t size)
+{
+    ngx_connection_t      *lc;
+    ngx_sendmmsg_batch_t  *sb;
+
+    if (c->listening == NULL || c->listening->connection->data == NULL) {
+        return NULL;
+    }
+
+    if (size > NGX_SENDMMSG_BUFFER) {
+        return NULL;
+    }
+
+    lc = c->listening->connection;
+    sb = lc->data;
+
+    if (sb->size + size > NGX_SENDMMSG_BUFFER) {
+        ngx_sendmmsg_flush(lc);
+    }
+
+    return sb->buffer + sb->size;
+}
+
+#endif /* NGX_HAVE_SENDMMSG */
_______________________________________________
nginx-devel mailing list
nginx-devel@nginx.org
https://mailman.nginx.org/mailman/listinfo/nginx-devel

Reply via email to