This is an automated email from the ASF dual-hosted git repository.

cmcfarlen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/trafficserver.git


The following commit(s) were added to refs/heads/master by this push:
     new 2bd6ff422 Experimental io_uring AIO implementation for disk IO (#8992)
2bd6ff422 is described below

commit 2bd6ff4225425a1ca4bb5139d9f1a05fed003f69
Author: Chris McFarlen <ch...@mcfarlen.us>
AuthorDate: Wed Oct 26 10:05:43 2022 -0500

    Experimental io_uring AIO implementation for disk IO (#8992)
    
    * add io_uring feature to build
    
    * link liburing library. Start adding implementation
    
    * rename to DiskHandler to match native aio
    
    * better way to integrate with net
    
    * wip
    
    * add diskhandler to main thread, link sqe ops for aio ops with multiple 
operations
    
    * cleanup
    
    * build should work without io_uring configured
    
    * add config
    
    * Finish config, attach parent wq, configure bounded/unbounded queues
    
    * restore accidental line removal
    
    * remove unused include
    
    * set io opcode and increment stats
    
    * Forward declare DiskHandler
    
    * io_uring stats
    
    * fix AIO_STAT_COUNT
    
    * fix newline at end of file
    
    Co-authored-by: Chris McFarlen <cmcfar...@apple.com>
---
 configure.ac                         |  28 ++++
 include/tscore/ink_config.h.in       |   1 +
 iocore/aio/AIO.cc                    | 276 +++++++++++++++++++++++++++++++++--
 iocore/aio/I_AIO.h                   |  57 +++++++-
 iocore/aio/P_AIO.h                   |   4 +
 iocore/eventsystem/I_EThread.h       |   2 +-
 iocore/net/P_UnixNet.h               |   4 +
 iocore/net/UnixNet.cc                |  35 +++++
 mgmt/RecordsConfig.cc                |  12 ++
 src/traffic_server/InkAPI.cc         |   2 +-
 src/traffic_server/Makefile.inc      |   1 +
 src/traffic_server/traffic_server.cc |  14 +-
 12 files changed, 417 insertions(+), 19 deletions(-)

diff --git a/configure.ac b/configure.ac
index 6f7f268f8..23ca94fdc 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1687,6 +1687,34 @@ AS_IF([test "x$enable_linux_native_aio" = "xyes"], [
 AC_MSG_RESULT([$enable_linux_native_aio])
 TS_ARG_ENABLE_VAR([use], [linux_native_aio])
 
+# Check for enabling io_uring on linux
+
+AC_MSG_CHECKING([whether to enable Linux io_uring])
+AC_ARG_ENABLE([experimental-linux-io-uring],
+  [AS_HELP_STRING([--enable-experimental-linux-io-uring], [WARNING this is 
experimental: enable Linux io_uring support @<:@default=no@:>@])],
+  [enable_linux_io_uring="${enableval}"],
+  [enable_linux_io_uring=no]
+)
+
+AS_IF([test "x$enable_linux_io_uring" = "xyes"], [
+  URING_LIBS="-luring"
+  if test $host_os_def  != "linux"; then
+    AC_MSG_ERROR([Linux io_uring can only be enabled on Linux systems])
+  fi
+
+  AC_CHECK_HEADERS([liburing.h], [],
+    [AC_MSG_ERROR([Linux io_uring requires liburing.h (install liburing)])]
+  )
+
+  AC_SEARCH_LIBS([io_uring_queue_init], [uring], [AC_SUBST([URING_LIBS])],
+    [AC_MSG_ERROR([Linux io_uring require uring])]
+  )
+])
+
+AC_MSG_RESULT([$enable_linux_io_uring])
+TS_ARG_ENABLE_VAR([use], [linux_io_uring])
+
+
 # Check for hwloc library.
 # If we don't find it, disable checking for header.
 use_hwloc=0
diff --git a/include/tscore/ink_config.h.in b/include/tscore/ink_config.h.in
index af3a0bbf3..30737e19c 100644
--- a/include/tscore/ink_config.h.in
+++ b/include/tscore/ink_config.h.in
@@ -80,6 +80,7 @@
 #define TS_USE_TLS_SET_CIPHERSUITES @use_tls_set_ciphersuites@
 #define TS_HAS_TLS_KEYLOGGING @has_tls_keylogging@
 #define TS_USE_LINUX_NATIVE_AIO @use_linux_native_aio@
+#define TS_USE_LINUX_IO_URING @use_linux_io_uring@
 #define TS_USE_REMOTE_UNWINDING @use_remote_unwinding@
 #define TS_USE_TLS_OCSP @use_tls_ocsp@
 #define TS_HAS_TLS_EARLY_DATA @has_tls_early_data@
diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc
index d76ed0bf0..078d2b58d 100644
--- a/iocore/aio/AIO.cc
+++ b/iocore/aio/AIO.cc
@@ -28,8 +28,14 @@
 #include "tscore/TSSystemState.h"
 #include "tscore/ink_hw.h"
 
+#include <atomic>
+
 #include "P_AIO.h"
 
+#if AIO_MODE == AIO_MODE_IO_URING
+#include <sys/eventfd.h>
+#endif
+
 #if AIO_MODE == AIO_MODE_NATIVE
 #define AIO_PERIOD -HRTIME_MSECONDS(10)
 #else
@@ -44,22 +50,35 @@ AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE];
 /* number of unique file descriptors in the aio_reqs array */
 int num_filedes = 1;
 
+#if AIO_MODE == AIO_MODE_THREAD
 // acquire this mutex before inserting a new entry in the aio_reqs array.
 // Don't need to acquire this for searching the array
 static ink_mutex insert_mutex;
+#endif
 
 int thread_is_created = 0;
 #endif // AIO_MODE == AIO_MODE_NATIVE
 RecInt cache_config_threads_per_disk = 12;
 RecInt api_config_threads_per_disk   = 12;
 
+// config for io_uring mode
+#if AIO_MODE == AIO_MODE_IO_URING
+RecInt aio_io_uring_queue_entries = 1024;
+RecInt aio_io_uring_sq_poll_ms    = 0;
+RecInt aio_io_uring_attach_wq     = 0;
+RecInt aio_io_uring_wq_bounded    = 0;
+RecInt aio_io_uring_wq_unbounded  = 0;
+#endif
+
 RecRawStatBlock *aio_rsb      = nullptr;
 Continuation *aio_err_callbck = nullptr;
 // AIO Stats
-uint64_t aio_num_read      = 0;
-uint64_t aio_bytes_read    = 0;
-uint64_t aio_num_write     = 0;
-uint64_t aio_bytes_written = 0;
+std::atomic<uint64_t> aio_num_read         = 0;
+std::atomic<uint64_t> aio_bytes_read       = 0;
+std::atomic<uint64_t> aio_num_write        = 0;
+std::atomic<uint64_t> aio_bytes_written    = 0;
+std::atomic<uint64_t> io_uring_submissions = 0;
+std::atomic<uint64_t> io_uring_completions = 0;
 
 /*
  * Stats
@@ -88,17 +107,25 @@ aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT 
data_type, RecData *da
   }
   switch (id) {
   case AIO_STAT_READ_PER_SEC:
-    new_val = aio_num_read;
+    new_val = aio_num_read.load();
     break;
   case AIO_STAT_WRITE_PER_SEC:
-    new_val = aio_num_write;
+    new_val = aio_num_write.load();
     break;
   case AIO_STAT_KB_READ_PER_SEC:
-    new_val = aio_bytes_read >> 10;
+    new_val = aio_bytes_read.load() >> 10;
     break;
   case AIO_STAT_KB_WRITE_PER_SEC:
-    new_val = aio_bytes_written >> 10;
+    new_val = aio_bytes_written.load() >> 10;
+    break;
+#if AIO_MODE == AIO_MODE_IO_URING
+  case AIO_STAT_IO_URING_SUBMITTED:
+    new_val = io_uring_submissions.load();
     break;
+  case AIO_STAT_IO_URING_COMPLETED:
+    new_val = io_uring_completions.load();
+    break;
+#endif
   default:
     ink_assert(0);
   }
@@ -159,7 +186,13 @@ ink_aio_init(ts::ModuleVersion v)
                      (int)AIO_STAT_KB_READ_PER_SEC, aio_stats_cb);
   RecRegisterRawStat(aio_rsb, RECT_PROCESS, 
"proxy.process.cache.KB_write_per_sec", RECD_FLOAT, RECP_PERSISTENT,
                      (int)AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb);
-#if AIO_MODE != AIO_MODE_NATIVE
+#if TS_USE_LINUX_IO_URING
+  RecRegisterRawStat(aio_rsb, RECT_PROCESS, 
"proxy.process.io_uring.submitted", RECD_FLOAT, RECP_PERSISTENT,
+                     (int)AIO_STAT_IO_URING_SUBMITTED, aio_stats_cb);
+  RecRegisterRawStat(aio_rsb, RECT_PROCESS, 
"proxy.process.io_uring.completed", RECD_FLOAT, RECP_PERSISTENT,
+                     (int)AIO_STAT_IO_URING_COMPLETED, aio_stats_cb);
+#endif
+#if AIO_MODE == AIO_MODE_THREAD
   memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *));
   ink_mutex_init(&insert_mutex);
 #endif
@@ -167,6 +200,14 @@ ink_aio_init(ts::ModuleVersion v)
 #if TS_USE_LINUX_NATIVE_AIO
   Warning("Running with Linux AIO, there are known issues with this feature");
 #endif
+
+#if TS_USE_LINUX_IO_URING
+  REC_ReadConfigInteger(aio_io_uring_queue_entries, 
"proxy.config.aio.io_uring.entries");
+  REC_ReadConfigInteger(aio_io_uring_sq_poll_ms, 
"proxy.config.aio.io_uring.sq_poll_ms");
+  REC_ReadConfigInteger(aio_io_uring_attach_wq, 
"proxy.config.aio.io_uring.attach_wq");
+  REC_ReadConfigInteger(aio_io_uring_wq_bounded, 
"proxy.config.aio.io_uring.wq_workers_bounded");
+  REC_ReadConfigInteger(aio_io_uring_wq_unbounded, 
"proxy.config.aio.io_uring.wq_workers_unbounded");
+#endif
 }
 
 int
@@ -179,7 +220,7 @@ ink_aio_start()
   return 0;
 }
 
-#if AIO_MODE != AIO_MODE_NATIVE
+#if AIO_MODE == AIO_MODE_THREAD
 
 struct AIOThreadInfo : public Continuation {
   AIO_Reqs *req;
@@ -460,11 +501,11 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
 #endif
       // update the stats;
       if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
-        aio_num_write++;
-        aio_bytes_written += op->aiocb.aio_nbytes;
+        aio_num_write.fetch_add(1);
+        aio_bytes_written.fetch_add(op->aiocb.aio_nbytes);
       } else {
-        aio_num_read++;
-        aio_bytes_read += op->aiocb.aio_nbytes;
+        aio_num_read.fetch_add(1);
+        aio_bytes_read.fetch_add(op->aiocb.aio_nbytes);
       }
       ink_mutex_release(&current_req->aio_mutex);
       cache_op((AIOCallbackInternal *)op);
@@ -490,6 +531,213 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info)
   }
   return nullptr;
 }
+
+#elif AIO_MODE == AIO_MODE_IO_URING
+
+std::atomic<int> aio_main_wq_fd;
+
+DiskHandler::DiskHandler()
+{
+  io_uring_params p{};
+
+  if (aio_io_uring_attach_wq > 0) {
+    int wq_fd = get_main_queue_fd();
+    if (wq_fd > 0) {
+      p.flags = IORING_SETUP_ATTACH_WQ;
+      p.wq_fd = wq_fd;
+    }
+  }
+
+  if (aio_io_uring_sq_poll_ms > 0) {
+    p.flags |= IORING_SETUP_SQPOLL;
+    p.sq_thread_idle = aio_io_uring_sq_poll_ms;
+  }
+
+  int ret = io_uring_queue_init_params(aio_io_uring_queue_entries, &ring, &p);
+  if (ret < 0) {
+    throw std::runtime_error(strerror(-ret));
+  }
+
+  /* no sharing for non-fixed either */
+  if (aio_io_uring_sq_poll_ms && !(p.features & IORING_FEAT_SQPOLL_NONFIXED)) {
+    throw std::runtime_error("No SQPOLL sharing with nonfixed");
+  }
+
+  // assign this handler to the thread
+  // TODO(cmcfarlen): maybe a bad place for this!
+  this_ethread()->diskHandler = this;
+}
+
+DiskHandler::~DiskHandler()
+{
+  io_uring_queue_exit(&ring);
+}
+
+void
+DiskHandler::set_main_queue(DiskHandler *dh)
+{
+  dh->set_wq_max_workers(aio_io_uring_wq_bounded, aio_io_uring_wq_unbounded);
+  aio_main_wq_fd.store(dh->ring.ring_fd);
+}
+
+int
+DiskHandler::get_main_queue_fd()
+{
+  return aio_main_wq_fd.load();
+}
+
+int
+DiskHandler::set_wq_max_workers(unsigned int bounded, unsigned int unbounded)
+{
+  if (bounded == 0 && unbounded == 0) {
+    return 0;
+  }
+  unsigned int args[2] = {bounded, unbounded};
+  int result           = io_uring_register_iowq_max_workers(&ring, args);
+  return result;
+}
+
+std::pair<int, int>
+DiskHandler::get_wq_max_workers()
+{
+  unsigned int args[2] = {0, 0};
+  io_uring_register_iowq_max_workers(&ring, args);
+  return std::make_pair(args[0], args[1]);
+}
+
+void
+DiskHandler::submit()
+{
+  io_uring_submissions.fetch_add(io_uring_submit(&ring));
+}
+
+void
+DiskHandler::handle_cqe(io_uring_cqe *cqe)
+{
+  AIOCallback *op = static_cast<AIOCallback *>(io_uring_cqe_get_data(cqe));
+
+  op->aio_result = static_cast<int64_t>(cqe->res);
+  op->link.prev  = nullptr;
+  op->link.next  = nullptr;
+  op->mutex      = op->action.mutex;
+
+  if (op->aiocb.aio_lio_opcode == LIO_WRITE) {
+    aio_num_write.fetch_add(1);
+    aio_bytes_written.fetch_add(op->aiocb.aio_nbytes);
+  } else {
+    aio_num_read.fetch_add(1);
+    aio_bytes_read.fetch_add(op->aiocb.aio_nbytes);
+  }
+
+  // the last op in the linked ops will have the original op stored in the 
aiocb
+  if (op->aiocb.aio_op) {
+    op = op->aiocb.aio_op;
+    if (op->thread == AIO_CALLBACK_THREAD_AIO) {
+      SCOPED_MUTEX_LOCK(lock, op->mutex, this_ethread());
+      op->handleEvent(EVENT_NONE, nullptr);
+    } else if (op->thread == AIO_CALLBACK_THREAD_ANY) {
+      eventProcessor.schedule_imm(op);
+    } else {
+      op->thread->schedule_imm(op);
+    }
+  }
+}
+
+void
+DiskHandler::service()
+{
+  io_uring_cqe *cqe = nullptr;
+  io_uring_peek_cqe(&ring, &cqe);
+  while (cqe) {
+    handle_cqe(cqe);
+    io_uring_completions.fetch_add(1);
+    io_uring_cqe_seen(&ring, cqe);
+
+    cqe = nullptr;
+    io_uring_peek_cqe(&ring, &cqe);
+  }
+}
+
+void
+DiskHandler::submit_and_wait(int ms)
+{
+  ink_hrtime t              = ink_hrtime_from_msec(ms);
+  timespec ts               = ink_hrtime_to_timespec(t);
+  __kernel_timespec timeout = {ts.tv_sec, ts.tv_nsec};
+  io_uring_cqe *cqe         = nullptr;
+
+  io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, nullptr);
+  while (cqe) {
+    handle_cqe(cqe);
+    io_uring_completions.fetch_add(1);
+    io_uring_cqe_seen(&ring, cqe);
+
+    cqe = nullptr;
+    io_uring_peek_cqe(&ring, &cqe);
+  }
+}
+
+int
+DiskHandler::register_eventfd()
+{
+  int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+
+  io_uring_register_eventfd(&ring, fd);
+
+  return fd;
+}
+
+DiskHandler *
+DiskHandler::local_context()
+{
+  // TODO(cmcfarlen): load config
+  thread_local DiskHandler threadContext;
+
+  return &threadContext;
+}
+
+int
+ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */)
+{
+  EThread *t      = this_ethread();
+  AIOCallback *op = op_in;
+  while (op) {
+    io_uring_sqe *sqe = t->diskHandler->next_sqe();
+    io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, 
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
+    io_uring_sqe_set_data(sqe, op);
+    op->aiocb.aio_lio_opcode = LIO_READ;
+    if (op->then) {
+      sqe->flags |= IOSQE_IO_LINK;
+    } else {
+      op->aiocb.aio_op = op_in;
+    }
+
+    op = op->then;
+  }
+  return 1;
+}
+
+int
+ink_aio_write(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */)
+{
+  EThread *t      = this_ethread();
+  AIOCallback *op = op_in;
+  while (op) {
+    io_uring_sqe *sqe = t->diskHandler->next_sqe();
+    io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, 
op->aiocb.aio_nbytes, op->aiocb.aio_offset);
+    io_uring_sqe_set_data(sqe, op);
+    op->aiocb.aio_lio_opcode = LIO_WRITE;
+    if (op->then) {
+      sqe->flags |= IOSQE_IO_LINK;
+    } else {
+      op->aiocb.aio_op = op_in;
+    }
+
+    op = op->then;
+  }
+  return 1;
+}
+
 #else
 int
 DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e)
diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h
index 6144e188f..40a41db34 100644
--- a/iocore/aio/I_AIO.h
+++ b/iocore/aio/I_AIO.h
@@ -40,9 +40,12 @@ static constexpr ts::ModuleVersion 
AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVer
 
 #define AIO_MODE_THREAD 0
 #define AIO_MODE_NATIVE 1
+#define AIO_MODE_IO_URING 2
 
 #if TS_USE_LINUX_NATIVE_AIO
 #define AIO_MODE AIO_MODE_NATIVE
+#elif TS_USE_LINUX_IO_URING
+#define AIO_MODE AIO_MODE_IO_URING
 #else
 #define AIO_MODE AIO_MODE_THREAD
 #endif
@@ -64,6 +67,21 @@ typedef struct io_event ink_io_event_t;
 #define aio_offset u.c.offset
 #define aio_buf u.c.buf
 
+#elif AIO_MODE == AIO_MODE_IO_URING
+#include <liburing.h>
+
+struct AIOCallback;
+struct ink_aiocb {
+  int aio_fildes    = -1;      /* file descriptor or status: 
AIO_NOT_IN_PROGRESS */
+  void *aio_buf     = nullptr; /* buffer location */
+  size_t aio_nbytes = 0;       /* length of transfer */
+  off_t aio_offset  = 0;       /* file offset */
+
+  int aio_lio_opcode  = 0; /* listio operation */
+  int aio_state       = 0; /* state flag for List I/O */
+  AIOCallback *aio_op = nullptr;
+};
+
 #else
 
 struct ink_aiocb {
@@ -135,14 +153,49 @@ struct DiskHandler : public Continuation {
 };
 #endif
 
+#if AIO_MODE == AIO_MODE_IO_URING
+
+class DiskHandler
+{
+public:
+  DiskHandler();
+  ~DiskHandler();
+
+  io_uring_sqe *
+  next_sqe()
+  {
+    return io_uring_get_sqe(&ring);
+  }
+
+  int set_wq_max_workers(unsigned int bounded, unsigned int unbounded);
+  std::pair<int, int> get_wq_max_workers();
+
+  void submit();
+  void service();
+  void submit_and_wait(int ms);
+
+  int register_eventfd();
+
+  static DiskHandler *local_context();
+  static void set_main_queue(DiskHandler *);
+  static int get_main_queue_fd();
+
+private:
+  io_uring ring;
+
+  void handle_cqe(io_uring_cqe *);
+};
+
+#endif
+
 void ink_aio_init(ts::ModuleVersion version);
 int ink_aio_start();
 void ink_aio_set_callback(Continuation *error_callback);
 
 int ink_aio_read(AIOCallback *op,
-                 int fromAPI = 0); // fromAPI is a boolean to indicate if this 
is from a API call such as upload proxy feature
+                 int fromAPI = 0); // fromAPI is a boolean to indicate if this 
is from an API call such as upload proxy feature
 int ink_aio_write(AIOCallback *op, int fromAPI = 0);
 int ink_aio_readv(AIOCallback *op,
-                  int fromAPI = 0); // fromAPI is a boolean to indicate if 
this is from a API call such as upload proxy feature
+                  int fromAPI = 0); // fromAPI is a boolean to indicate if 
this is from an API call such as upload proxy feature
 int ink_aio_writev(AIOCallback *op, int fromAPI = 0);
 AIOCallback *new_AIOCallback();
diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h
index 18a8690e3..d7b7fd25f 100644
--- a/iocore/aio/P_AIO.h
+++ b/iocore/aio/P_AIO.h
@@ -148,6 +148,10 @@ enum aio_stat_enum {
   AIO_STAT_KB_READ_PER_SEC,
   AIO_STAT_WRITE_PER_SEC,
   AIO_STAT_KB_WRITE_PER_SEC,
+#if AIO_MODE == AIO_MODE_IO_URING
+  AIO_STAT_IO_URING_SUBMITTED,
+  AIO_STAT_IO_URING_COMPLETED,
+#endif
   AIO_STAT_COUNT
 };
 extern RecRawStatBlock *aio_rsb;
diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h
index 16ef7571f..65d43ce26 100644
--- a/iocore/eventsystem/I_EThread.h
+++ b/iocore/eventsystem/I_EThread.h
@@ -39,7 +39,7 @@
 // instead.
 #define MUTEX_RETRY_DELAY HRTIME_MSECONDS(20)
 
-struct DiskHandler;
+class DiskHandler;
 struct EventIO;
 
 class ServerSessionPool;
diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h
index cd6f1f788..66a029773 100644
--- a/iocore/net/P_UnixNet.h
+++ b/iocore/net/P_UnixNet.h
@@ -36,6 +36,7 @@
 #define EVENTIO_DNS_CONNECTION 3
 #define EVENTIO_UDP_CONNECTION 4
 #define EVENTIO_ASYNC_SIGNAL 5
+#define EVENTIO_DISK 6
 
 #if TS_USE_EPOLL
 #ifndef EPOLLEXCLUSIVE
@@ -77,6 +78,7 @@ typedef PollDescriptor *EventLoop;
 
 class NetEvent;
 class UnixUDPConnection;
+class DiskHandler;
 struct DNSConnection;
 struct NetAccept;
 
@@ -95,6 +97,7 @@ struct EventIO {
     DNSConnection *dnscon;
     NetAccept *na;
     UnixUDPConnection *uc;
+    DiskHandler *dh;
   } data; ///< a kind of continuation
 
   /** The start methods all logically Setup a class to be called
@@ -111,6 +114,7 @@ struct EventIO {
   int start(EventLoop l, NetEvent *ne, int events);
   int start(EventLoop l, UnixUDPConnection *vc, int events);
   int start(EventLoop l, int fd, NetEvent *ne, int events);
+  int start(EventLoop l, DiskHandler *dh);
   int start_common(EventLoop l, int fd, int events);
 
   /** Alter the events that will trigger the continuation, for level triggered 
I/O.
diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc
index f054e5697..1446b9502 100644
--- a/iocore/net/UnixNet.cc
+++ b/iocore/net/UnixNet.cc
@@ -22,6 +22,7 @@
  */
 
 #include "P_Net.h"
+#include "I_AIO.h"
 
 using namespace std::literals;
 
@@ -258,6 +259,9 @@ initialize_thread_for_net(EThread *thread)
   thread->ep->type = EVENTIO_ASYNC_SIGNAL;
 #if HAVE_EVENTFD
   thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ);
+#if TS_USE_LINUX_IO_URING
+  thread->ep->start(pd, DiskHandler::local_context());
+#endif
 #else
   thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ);
 #endif
@@ -486,12 +490,20 @@ int
 NetHandler::waitForActivity(ink_hrtime timeout)
 {
   EventIO *epd = nullptr;
+#if AIO_MODE == AIO_MODE_IO_URING
+  DiskHandler *dh = DiskHandler::local_context();
+  bool servicedh  = false;
+#endif
 
   NET_INCREMENT_DYN_STAT(net_handler_run_stat);
   SCOPED_MUTEX_LOCK(lock, mutex, this->thread);
 
   process_enabled_list();
 
+#if AIO_MODE == AIO_MODE_IO_URING
+  dh->submit();
+#endif
+
   // Polling event by PollCont
   PollCont *p = get_PollCont(this->thread);
   p->do_poll(timeout);
@@ -543,6 +555,10 @@ NetHandler::waitForActivity(ink_hrtime timeout)
       net_signal_hook_callback(this->thread);
     } else if (epd->type == EVENTIO_NETACCEPT) {
       this->thread->schedule_imm(epd->data.na);
+#if AIO_MODE == AIO_MODE_IO_URING
+    } else if (epd->type == EVENTIO_DISK) {
+      servicedh = true;
+#endif
     }
     ev_next_event(pd, x);
   }
@@ -551,6 +567,12 @@ NetHandler::waitForActivity(ink_hrtime timeout)
 
   process_ready_list();
 
+#if AIO_MODE == AIO_MODE_IO_URING
+  if (servicedh) {
+    dh->service();
+  }
+#endif
+
   return EVENT_CONT;
 }
 
@@ -782,3 +804,16 @@ NetHandler::remove_from_active_queue(NetEvent *ne)
     --active_queue_size;
   }
 }
+
+int
+EventIO::start(EventLoop l, DiskHandler *dh)
+{
+#if AIO_MODE == AIO_MODE_IO_URING
+  data.dh = dh;
+  int fd  = dh->register_eventfd();
+  type    = EVENTIO_DISK;
+  return start_common(l, fd, EVENTIO_READ);
+#else
+  return 1;
+#endif
+}
diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc
index 1548daf7b..0ad805de6 100644
--- a/mgmt/RecordsConfig.cc
+++ b/mgmt/RecordsConfig.cc
@@ -1558,6 +1558,18 @@ static const RecordElement RecordsConfig[] =
   //#
   //###########
   {RECT_CONFIG, "proxy.config.http.host_sni_policy", RECD_INT, "2", RECU_NULL, 
RR_NULL, RECC_NULL, "[0-2]", RECA_NULL},
+
+  //###########
+  //#
+  //# AIO specific configuration
+  //#
+  //###########
+  {RECT_CONFIG, "proxy.config.aio.io_uring.entries", RECD_INT, "1024", 
RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL},
+  {RECT_CONFIG, "proxy.config.aio.io_uring.sq_poll_ms", RECD_INT, "0", 
RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL},
+  {RECT_CONFIG, "proxy.config.aio.io_uring.attach_wq", RECD_INT, "0", 
RECU_NULL, RR_NULL, RECC_NULL, "[0-1]", RECA_NULL},
+  {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_bounded", RECD_INT, "0", 
RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL},
+  {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_unbounded", RECD_INT, 
"0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL},
+
 };
 // clang-format on
 
diff --git a/src/traffic_server/InkAPI.cc b/src/traffic_server/InkAPI.cc
index f470bbb8d..15ed96267 100644
--- a/src/traffic_server/InkAPI.cc
+++ b/src/traffic_server/InkAPI.cc
@@ -8588,7 +8588,7 @@ TSAIOWrite(int fd, off_t offset, char *buf, const size_t 
bufSize, TSCont contp)
 TSReturnCode
 TSAIOThreadNumSet(int thread_num)
 {
-#if AIO_MODE == AIO_MODE_NATIVE
+#if AIO_MODE == AIO_MODE_NATIVE || AIO_MODE == AIO_MODE_IO_URING
   (void)thread_num;
   return TS_SUCCESS;
 #else
diff --git a/src/traffic_server/Makefile.inc b/src/traffic_server/Makefile.inc
index caf65f140..c816e3514 100644
--- a/src/traffic_server/Makefile.inc
+++ b/src/traffic_server/Makefile.inc
@@ -91,6 +91,7 @@ traffic_server_traffic_server_LDADD = \
        @OPENSSL_LIBS@ \
        @YAMLCPP_LIBS@ \
        @BORINGOCSP_LIBS@ \
+       @URING_LIBS@ \
        -lm
 
 if IS_DARWIN
diff --git a/src/traffic_server/traffic_server.cc 
b/src/traffic_server/traffic_server.cc
index 7b563eb8f..c5c44fa07 100644
--- a/src/traffic_server/traffic_server.cc
+++ b/src/traffic_server/traffic_server.cc
@@ -1856,7 +1856,7 @@ main(int /* argc ATS_UNUSED */, const char **argv)
   // This call is required for win_9xMe
   // without this this_ethread() is failing when
   // start_HttpProxyServer is called from main thread
-  Thread *main_thread = new EThread;
+  EThread *main_thread = new EThread;
   main_thread->set_specific();
 
   // Re-initialize diagsConfig based on records.config configuration
@@ -2012,6 +2012,14 @@ main(int /* argc ATS_UNUSED */, const char **argv)
     proxyServerCheck.notify_one();
   }
 
+#if TS_USE_LINUX_IO_URING == 1
+  Note("Using io_uring for AIO");
+  DiskHandler *main_aio = DiskHandler::local_context();
+  DiskHandler::set_main_queue(main_aio);
+  auto [bounded, unbounded] = main_aio->get_wq_max_workers();
+  Note("io_uring: WQ workers - bounded = %d, unbounded = %d", bounded, 
unbounded);
+#endif
+
   // !! ET_NET threads start here !!
   // This means any spawn scheduling must be done before this point.
   eventProcessor.start(num_of_net_threads, stacksize);
@@ -2223,7 +2231,11 @@ main(int /* argc ATS_UNUSED */, const char **argv)
   TSSystemState::initialization_done();
 
   while (!TSSystemState::is_event_system_shut_down()) {
+#if TS_USE_LINUX_IO_URING == 1
+    main_aio->submit_and_wait(1000);
+#else
     sleep(1);
+#endif
   }
 
   delete main_thread;

Reply via email to