This is an automated email from the ASF dual-hosted git repository. cmcfarlen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/trafficserver.git
The following commit(s) were added to refs/heads/master by this push: new 2bd6ff422 Experimental io_uring AIO implementation for disk IO (#8992) 2bd6ff422 is described below commit 2bd6ff4225425a1ca4bb5139d9f1a05fed003f69 Author: Chris McFarlen <ch...@mcfarlen.us> AuthorDate: Wed Oct 26 10:05:43 2022 -0500 Experimental io_uring AIO implementation for disk IO (#8992) * add io_uring feature to build * link liburing library. Start adding implementation * rename to DiskHandler to match native aio * better way to integrate with net * wip * add diskhandler to main thread, link sqe ops for aio ops with multiple operations * cleanup * build should work without io_uring configured * add config * Finish config, attach parent wq, configure bounded/unbounded queues * restore accidental line removal * remove unused include * set io opcode and increment stats * Forward declare DiskHandler * io_uring stats * fix AIO_STAT_COUNT * fix newline at end of file Co-authored-by: Chris McFarlen <cmcfar...@apple.com> --- configure.ac | 28 ++++ include/tscore/ink_config.h.in | 1 + iocore/aio/AIO.cc | 276 +++++++++++++++++++++++++++++++++-- iocore/aio/I_AIO.h | 57 +++++++- iocore/aio/P_AIO.h | 4 + iocore/eventsystem/I_EThread.h | 2 +- iocore/net/P_UnixNet.h | 4 + iocore/net/UnixNet.cc | 35 +++++ mgmt/RecordsConfig.cc | 12 ++ src/traffic_server/InkAPI.cc | 2 +- src/traffic_server/Makefile.inc | 1 + src/traffic_server/traffic_server.cc | 14 +- 12 files changed, 417 insertions(+), 19 deletions(-) diff --git a/configure.ac b/configure.ac index 6f7f268f8..23ca94fdc 100644 --- a/configure.ac +++ b/configure.ac @@ -1687,6 +1687,34 @@ AS_IF([test "x$enable_linux_native_aio" = "xyes"], [ AC_MSG_RESULT([$enable_linux_native_aio]) TS_ARG_ENABLE_VAR([use], [linux_native_aio]) +# Check for enabling io_uring on linux + +AC_MSG_CHECKING([whether to enable Linux io_uring]) +AC_ARG_ENABLE([experimental-linux-io-uring], + [AS_HELP_STRING([--enable-experimental-linux-io-uring], [WARNING this is experimental: enable Linux io_uring support @<:@default=no@:>@])], + [enable_linux_io_uring="${enableval}"], + [enable_linux_io_uring=no] +) + +AS_IF([test "x$enable_linux_io_uring" = "xyes"], [ + URING_LIBS="-luring" + if test $host_os_def != "linux"; then + AC_MSG_ERROR([Linux io_uring can only be enabled on Linux systems]) + fi + + AC_CHECK_HEADERS([liburing.h], [], + [AC_MSG_ERROR([Linux io_uring requires liburing.h (install liburing)])] + ) + + AC_SEARCH_LIBS([io_uring_queue_init], [uring], [AC_SUBST([URING_LIBS])], + [AC_MSG_ERROR([Linux io_uring require uring])] + ) +]) + +AC_MSG_RESULT([$enable_linux_io_uring]) +TS_ARG_ENABLE_VAR([use], [linux_io_uring]) + + # Check for hwloc library. # If we don't find it, disable checking for header. use_hwloc=0 diff --git a/include/tscore/ink_config.h.in b/include/tscore/ink_config.h.in index af3a0bbf3..30737e19c 100644 --- a/include/tscore/ink_config.h.in +++ b/include/tscore/ink_config.h.in @@ -80,6 +80,7 @@ #define TS_USE_TLS_SET_CIPHERSUITES @use_tls_set_ciphersuites@ #define TS_HAS_TLS_KEYLOGGING @has_tls_keylogging@ #define TS_USE_LINUX_NATIVE_AIO @use_linux_native_aio@ +#define TS_USE_LINUX_IO_URING @use_linux_io_uring@ #define TS_USE_REMOTE_UNWINDING @use_remote_unwinding@ #define TS_USE_TLS_OCSP @use_tls_ocsp@ #define TS_HAS_TLS_EARLY_DATA @has_tls_early_data@ diff --git a/iocore/aio/AIO.cc b/iocore/aio/AIO.cc index d76ed0bf0..078d2b58d 100644 --- a/iocore/aio/AIO.cc +++ b/iocore/aio/AIO.cc @@ -28,8 +28,14 @@ #include "tscore/TSSystemState.h" #include "tscore/ink_hw.h" +#include <atomic> + #include "P_AIO.h" +#if AIO_MODE == AIO_MODE_IO_URING +#include <sys/eventfd.h> +#endif + #if AIO_MODE == AIO_MODE_NATIVE #define AIO_PERIOD -HRTIME_MSECONDS(10) #else @@ -44,22 +50,35 @@ AIO_Reqs *aio_reqs[MAX_DISKS_POSSIBLE]; /* number of unique file descriptors in the aio_reqs array */ int num_filedes = 1; +#if AIO_MODE == AIO_MODE_THREAD // acquire this mutex before inserting a new entry in the aio_reqs array. // Don't need to acquire this for searching the array static ink_mutex insert_mutex; +#endif int thread_is_created = 0; #endif // AIO_MODE == AIO_MODE_NATIVE RecInt cache_config_threads_per_disk = 12; RecInt api_config_threads_per_disk = 12; +// config for io_uring mode +#if AIO_MODE == AIO_MODE_IO_URING +RecInt aio_io_uring_queue_entries = 1024; +RecInt aio_io_uring_sq_poll_ms = 0; +RecInt aio_io_uring_attach_wq = 0; +RecInt aio_io_uring_wq_bounded = 0; +RecInt aio_io_uring_wq_unbounded = 0; +#endif + RecRawStatBlock *aio_rsb = nullptr; Continuation *aio_err_callbck = nullptr; // AIO Stats -uint64_t aio_num_read = 0; -uint64_t aio_bytes_read = 0; -uint64_t aio_num_write = 0; -uint64_t aio_bytes_written = 0; +std::atomic<uint64_t> aio_num_read = 0; +std::atomic<uint64_t> aio_bytes_read = 0; +std::atomic<uint64_t> aio_num_write = 0; +std::atomic<uint64_t> aio_bytes_written = 0; +std::atomic<uint64_t> io_uring_submissions = 0; +std::atomic<uint64_t> io_uring_completions = 0; /* * Stats @@ -88,17 +107,25 @@ aio_stats_cb(const char * /* name ATS_UNUSED */, RecDataT data_type, RecData *da } switch (id) { case AIO_STAT_READ_PER_SEC: - new_val = aio_num_read; + new_val = aio_num_read.load(); break; case AIO_STAT_WRITE_PER_SEC: - new_val = aio_num_write; + new_val = aio_num_write.load(); break; case AIO_STAT_KB_READ_PER_SEC: - new_val = aio_bytes_read >> 10; + new_val = aio_bytes_read.load() >> 10; break; case AIO_STAT_KB_WRITE_PER_SEC: - new_val = aio_bytes_written >> 10; + new_val = aio_bytes_written.load() >> 10; + break; +#if AIO_MODE == AIO_MODE_IO_URING + case AIO_STAT_IO_URING_SUBMITTED: + new_val = io_uring_submissions.load(); break; + case AIO_STAT_IO_URING_COMPLETED: + new_val = io_uring_completions.load(); + break; +#endif default: ink_assert(0); } @@ -159,7 +186,13 @@ ink_aio_init(ts::ModuleVersion v) (int)AIO_STAT_KB_READ_PER_SEC, aio_stats_cb); RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.cache.KB_write_per_sec", RECD_FLOAT, RECP_PERSISTENT, (int)AIO_STAT_KB_WRITE_PER_SEC, aio_stats_cb); -#if AIO_MODE != AIO_MODE_NATIVE +#if TS_USE_LINUX_IO_URING + RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.io_uring.submitted", RECD_FLOAT, RECP_PERSISTENT, + (int)AIO_STAT_IO_URING_SUBMITTED, aio_stats_cb); + RecRegisterRawStat(aio_rsb, RECT_PROCESS, "proxy.process.io_uring.completed", RECD_FLOAT, RECP_PERSISTENT, + (int)AIO_STAT_IO_URING_COMPLETED, aio_stats_cb); +#endif +#if AIO_MODE == AIO_MODE_THREAD memset(&aio_reqs, 0, MAX_DISKS_POSSIBLE * sizeof(AIO_Reqs *)); ink_mutex_init(&insert_mutex); #endif @@ -167,6 +200,14 @@ ink_aio_init(ts::ModuleVersion v) #if TS_USE_LINUX_NATIVE_AIO Warning("Running with Linux AIO, there are known issues with this feature"); #endif + +#if TS_USE_LINUX_IO_URING + REC_ReadConfigInteger(aio_io_uring_queue_entries, "proxy.config.aio.io_uring.entries"); + REC_ReadConfigInteger(aio_io_uring_sq_poll_ms, "proxy.config.aio.io_uring.sq_poll_ms"); + REC_ReadConfigInteger(aio_io_uring_attach_wq, "proxy.config.aio.io_uring.attach_wq"); + REC_ReadConfigInteger(aio_io_uring_wq_bounded, "proxy.config.aio.io_uring.wq_workers_bounded"); + REC_ReadConfigInteger(aio_io_uring_wq_unbounded, "proxy.config.aio.io_uring.wq_workers_unbounded"); +#endif } int @@ -179,7 +220,7 @@ ink_aio_start() return 0; } -#if AIO_MODE != AIO_MODE_NATIVE +#if AIO_MODE == AIO_MODE_THREAD struct AIOThreadInfo : public Continuation { AIO_Reqs *req; @@ -460,11 +501,11 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info) #endif // update the stats; if (op->aiocb.aio_lio_opcode == LIO_WRITE) { - aio_num_write++; - aio_bytes_written += op->aiocb.aio_nbytes; + aio_num_write.fetch_add(1); + aio_bytes_written.fetch_add(op->aiocb.aio_nbytes); } else { - aio_num_read++; - aio_bytes_read += op->aiocb.aio_nbytes; + aio_num_read.fetch_add(1); + aio_bytes_read.fetch_add(op->aiocb.aio_nbytes); } ink_mutex_release(¤t_req->aio_mutex); cache_op((AIOCallbackInternal *)op); @@ -490,6 +531,213 @@ AIOThreadInfo::aio_thread_main(AIOThreadInfo *thr_info) } return nullptr; } + +#elif AIO_MODE == AIO_MODE_IO_URING + +std::atomic<int> aio_main_wq_fd; + +DiskHandler::DiskHandler() +{ + io_uring_params p{}; + + if (aio_io_uring_attach_wq > 0) { + int wq_fd = get_main_queue_fd(); + if (wq_fd > 0) { + p.flags = IORING_SETUP_ATTACH_WQ; + p.wq_fd = wq_fd; + } + } + + if (aio_io_uring_sq_poll_ms > 0) { + p.flags |= IORING_SETUP_SQPOLL; + p.sq_thread_idle = aio_io_uring_sq_poll_ms; + } + + int ret = io_uring_queue_init_params(aio_io_uring_queue_entries, &ring, &p); + if (ret < 0) { + throw std::runtime_error(strerror(-ret)); + } + + /* no sharing for non-fixed either */ + if (aio_io_uring_sq_poll_ms && !(p.features & IORING_FEAT_SQPOLL_NONFIXED)) { + throw std::runtime_error("No SQPOLL sharing with nonfixed"); + } + + // assign this handler to the thread + // TODO(cmcfarlen): maybe a bad place for this! + this_ethread()->diskHandler = this; +} + +DiskHandler::~DiskHandler() +{ + io_uring_queue_exit(&ring); +} + +void +DiskHandler::set_main_queue(DiskHandler *dh) +{ + dh->set_wq_max_workers(aio_io_uring_wq_bounded, aio_io_uring_wq_unbounded); + aio_main_wq_fd.store(dh->ring.ring_fd); +} + +int +DiskHandler::get_main_queue_fd() +{ + return aio_main_wq_fd.load(); +} + +int +DiskHandler::set_wq_max_workers(unsigned int bounded, unsigned int unbounded) +{ + if (bounded == 0 && unbounded == 0) { + return 0; + } + unsigned int args[2] = {bounded, unbounded}; + int result = io_uring_register_iowq_max_workers(&ring, args); + return result; +} + +std::pair<int, int> +DiskHandler::get_wq_max_workers() +{ + unsigned int args[2] = {0, 0}; + io_uring_register_iowq_max_workers(&ring, args); + return std::make_pair(args[0], args[1]); +} + +void +DiskHandler::submit() +{ + io_uring_submissions.fetch_add(io_uring_submit(&ring)); +} + +void +DiskHandler::handle_cqe(io_uring_cqe *cqe) +{ + AIOCallback *op = static_cast<AIOCallback *>(io_uring_cqe_get_data(cqe)); + + op->aio_result = static_cast<int64_t>(cqe->res); + op->link.prev = nullptr; + op->link.next = nullptr; + op->mutex = op->action.mutex; + + if (op->aiocb.aio_lio_opcode == LIO_WRITE) { + aio_num_write.fetch_add(1); + aio_bytes_written.fetch_add(op->aiocb.aio_nbytes); + } else { + aio_num_read.fetch_add(1); + aio_bytes_read.fetch_add(op->aiocb.aio_nbytes); + } + + // the last op in the linked ops will have the original op stored in the aiocb + if (op->aiocb.aio_op) { + op = op->aiocb.aio_op; + if (op->thread == AIO_CALLBACK_THREAD_AIO) { + SCOPED_MUTEX_LOCK(lock, op->mutex, this_ethread()); + op->handleEvent(EVENT_NONE, nullptr); + } else if (op->thread == AIO_CALLBACK_THREAD_ANY) { + eventProcessor.schedule_imm(op); + } else { + op->thread->schedule_imm(op); + } + } +} + +void +DiskHandler::service() +{ + io_uring_cqe *cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + while (cqe) { + handle_cqe(cqe); + io_uring_completions.fetch_add(1); + io_uring_cqe_seen(&ring, cqe); + + cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + } +} + +void +DiskHandler::submit_and_wait(int ms) +{ + ink_hrtime t = ink_hrtime_from_msec(ms); + timespec ts = ink_hrtime_to_timespec(t); + __kernel_timespec timeout = {ts.tv_sec, ts.tv_nsec}; + io_uring_cqe *cqe = nullptr; + + io_uring_submit_and_wait_timeout(&ring, &cqe, 1, &timeout, nullptr); + while (cqe) { + handle_cqe(cqe); + io_uring_completions.fetch_add(1); + io_uring_cqe_seen(&ring, cqe); + + cqe = nullptr; + io_uring_peek_cqe(&ring, &cqe); + } +} + +int +DiskHandler::register_eventfd() +{ + int fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); + + io_uring_register_eventfd(&ring, fd); + + return fd; +} + +DiskHandler * +DiskHandler::local_context() +{ + // TODO(cmcfarlen): load config + thread_local DiskHandler threadContext; + + return &threadContext; +} + +int +ink_aio_read(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) +{ + EThread *t = this_ethread(); + AIOCallback *op = op_in; + while (op) { + io_uring_sqe *sqe = t->diskHandler->next_sqe(); + io_uring_prep_read(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); + io_uring_sqe_set_data(sqe, op); + op->aiocb.aio_lio_opcode = LIO_READ; + if (op->then) { + sqe->flags |= IOSQE_IO_LINK; + } else { + op->aiocb.aio_op = op_in; + } + + op = op->then; + } + return 1; +} + +int +ink_aio_write(AIOCallback *op_in, int /* fromAPI ATS_UNUSED */) +{ + EThread *t = this_ethread(); + AIOCallback *op = op_in; + while (op) { + io_uring_sqe *sqe = t->diskHandler->next_sqe(); + io_uring_prep_write(sqe, op->aiocb.aio_fildes, op->aiocb.aio_buf, op->aiocb.aio_nbytes, op->aiocb.aio_offset); + io_uring_sqe_set_data(sqe, op); + op->aiocb.aio_lio_opcode = LIO_WRITE; + if (op->then) { + sqe->flags |= IOSQE_IO_LINK; + } else { + op->aiocb.aio_op = op_in; + } + + op = op->then; + } + return 1; +} + #else int DiskHandler::startAIOEvent(int /* event ATS_UNUSED */, Event *e) diff --git a/iocore/aio/I_AIO.h b/iocore/aio/I_AIO.h index 6144e188f..40a41db34 100644 --- a/iocore/aio/I_AIO.h +++ b/iocore/aio/I_AIO.h @@ -40,9 +40,12 @@ static constexpr ts::ModuleVersion AIO_MODULE_PUBLIC_VERSION(1, 0, ts::ModuleVer #define AIO_MODE_THREAD 0 #define AIO_MODE_NATIVE 1 +#define AIO_MODE_IO_URING 2 #if TS_USE_LINUX_NATIVE_AIO #define AIO_MODE AIO_MODE_NATIVE +#elif TS_USE_LINUX_IO_URING +#define AIO_MODE AIO_MODE_IO_URING #else #define AIO_MODE AIO_MODE_THREAD #endif @@ -64,6 +67,21 @@ typedef struct io_event ink_io_event_t; #define aio_offset u.c.offset #define aio_buf u.c.buf +#elif AIO_MODE == AIO_MODE_IO_URING +#include <liburing.h> + +struct AIOCallback; +struct ink_aiocb { + int aio_fildes = -1; /* file descriptor or status: AIO_NOT_IN_PROGRESS */ + void *aio_buf = nullptr; /* buffer location */ + size_t aio_nbytes = 0; /* length of transfer */ + off_t aio_offset = 0; /* file offset */ + + int aio_lio_opcode = 0; /* listio operation */ + int aio_state = 0; /* state flag for List I/O */ + AIOCallback *aio_op = nullptr; +}; + #else struct ink_aiocb { @@ -135,14 +153,49 @@ struct DiskHandler : public Continuation { }; #endif +#if AIO_MODE == AIO_MODE_IO_URING + +class DiskHandler +{ +public: + DiskHandler(); + ~DiskHandler(); + + io_uring_sqe * + next_sqe() + { + return io_uring_get_sqe(&ring); + } + + int set_wq_max_workers(unsigned int bounded, unsigned int unbounded); + std::pair<int, int> get_wq_max_workers(); + + void submit(); + void service(); + void submit_and_wait(int ms); + + int register_eventfd(); + + static DiskHandler *local_context(); + static void set_main_queue(DiskHandler *); + static int get_main_queue_fd(); + +private: + io_uring ring; + + void handle_cqe(io_uring_cqe *); +}; + +#endif + void ink_aio_init(ts::ModuleVersion version); int ink_aio_start(); void ink_aio_set_callback(Continuation *error_callback); int ink_aio_read(AIOCallback *op, - int fromAPI = 0); // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature + int fromAPI = 0); // fromAPI is a boolean to indicate if this is from an API call such as upload proxy feature int ink_aio_write(AIOCallback *op, int fromAPI = 0); int ink_aio_readv(AIOCallback *op, - int fromAPI = 0); // fromAPI is a boolean to indicate if this is from a API call such as upload proxy feature + int fromAPI = 0); // fromAPI is a boolean to indicate if this is from an API call such as upload proxy feature int ink_aio_writev(AIOCallback *op, int fromAPI = 0); AIOCallback *new_AIOCallback(); diff --git a/iocore/aio/P_AIO.h b/iocore/aio/P_AIO.h index 18a8690e3..d7b7fd25f 100644 --- a/iocore/aio/P_AIO.h +++ b/iocore/aio/P_AIO.h @@ -148,6 +148,10 @@ enum aio_stat_enum { AIO_STAT_KB_READ_PER_SEC, AIO_STAT_WRITE_PER_SEC, AIO_STAT_KB_WRITE_PER_SEC, +#if AIO_MODE == AIO_MODE_IO_URING + AIO_STAT_IO_URING_SUBMITTED, + AIO_STAT_IO_URING_COMPLETED, +#endif AIO_STAT_COUNT }; extern RecRawStatBlock *aio_rsb; diff --git a/iocore/eventsystem/I_EThread.h b/iocore/eventsystem/I_EThread.h index 16ef7571f..65d43ce26 100644 --- a/iocore/eventsystem/I_EThread.h +++ b/iocore/eventsystem/I_EThread.h @@ -39,7 +39,7 @@ // instead. #define MUTEX_RETRY_DELAY HRTIME_MSECONDS(20) -struct DiskHandler; +class DiskHandler; struct EventIO; class ServerSessionPool; diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index cd6f1f788..66a029773 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -36,6 +36,7 @@ #define EVENTIO_DNS_CONNECTION 3 #define EVENTIO_UDP_CONNECTION 4 #define EVENTIO_ASYNC_SIGNAL 5 +#define EVENTIO_DISK 6 #if TS_USE_EPOLL #ifndef EPOLLEXCLUSIVE @@ -77,6 +78,7 @@ typedef PollDescriptor *EventLoop; class NetEvent; class UnixUDPConnection; +class DiskHandler; struct DNSConnection; struct NetAccept; @@ -95,6 +97,7 @@ struct EventIO { DNSConnection *dnscon; NetAccept *na; UnixUDPConnection *uc; + DiskHandler *dh; } data; ///< a kind of continuation /** The start methods all logically Setup a class to be called @@ -111,6 +114,7 @@ struct EventIO { int start(EventLoop l, NetEvent *ne, int events); int start(EventLoop l, UnixUDPConnection *vc, int events); int start(EventLoop l, int fd, NetEvent *ne, int events); + int start(EventLoop l, DiskHandler *dh); int start_common(EventLoop l, int fd, int events); /** Alter the events that will trigger the continuation, for level triggered I/O. diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index f054e5697..1446b9502 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -22,6 +22,7 @@ */ #include "P_Net.h" +#include "I_AIO.h" using namespace std::literals; @@ -258,6 +259,9 @@ initialize_thread_for_net(EThread *thread) thread->ep->type = EVENTIO_ASYNC_SIGNAL; #if HAVE_EVENTFD thread->ep->start(pd, thread->evfd, nullptr, EVENTIO_READ); +#if TS_USE_LINUX_IO_URING + thread->ep->start(pd, DiskHandler::local_context()); +#endif #else thread->ep->start(pd, thread->evpipe[0], nullptr, EVENTIO_READ); #endif @@ -486,12 +490,20 @@ int NetHandler::waitForActivity(ink_hrtime timeout) { EventIO *epd = nullptr; +#if AIO_MODE == AIO_MODE_IO_URING + DiskHandler *dh = DiskHandler::local_context(); + bool servicedh = false; +#endif NET_INCREMENT_DYN_STAT(net_handler_run_stat); SCOPED_MUTEX_LOCK(lock, mutex, this->thread); process_enabled_list(); +#if AIO_MODE == AIO_MODE_IO_URING + dh->submit(); +#endif + // Polling event by PollCont PollCont *p = get_PollCont(this->thread); p->do_poll(timeout); @@ -543,6 +555,10 @@ NetHandler::waitForActivity(ink_hrtime timeout) net_signal_hook_callback(this->thread); } else if (epd->type == EVENTIO_NETACCEPT) { this->thread->schedule_imm(epd->data.na); +#if AIO_MODE == AIO_MODE_IO_URING + } else if (epd->type == EVENTIO_DISK) { + servicedh = true; +#endif } ev_next_event(pd, x); } @@ -551,6 +567,12 @@ NetHandler::waitForActivity(ink_hrtime timeout) process_ready_list(); +#if AIO_MODE == AIO_MODE_IO_URING + if (servicedh) { + dh->service(); + } +#endif + return EVENT_CONT; } @@ -782,3 +804,16 @@ NetHandler::remove_from_active_queue(NetEvent *ne) --active_queue_size; } } + +int +EventIO::start(EventLoop l, DiskHandler *dh) +{ +#if AIO_MODE == AIO_MODE_IO_URING + data.dh = dh; + int fd = dh->register_eventfd(); + type = EVENTIO_DISK; + return start_common(l, fd, EVENTIO_READ); +#else + return 1; +#endif +} diff --git a/mgmt/RecordsConfig.cc b/mgmt/RecordsConfig.cc index 1548daf7b..0ad805de6 100644 --- a/mgmt/RecordsConfig.cc +++ b/mgmt/RecordsConfig.cc @@ -1558,6 +1558,18 @@ static const RecordElement RecordsConfig[] = //# //########### {RECT_CONFIG, "proxy.config.http.host_sni_policy", RECD_INT, "2", RECU_NULL, RR_NULL, RECC_NULL, "[0-2]", RECA_NULL}, + + //########### + //# + //# AIO specific configuration + //# + //########### + {RECT_CONFIG, "proxy.config.aio.io_uring.entries", RECD_INT, "1024", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.aio.io_uring.sq_poll_ms", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.aio.io_uring.attach_wq", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, "[0-1]", RECA_NULL}, + {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_bounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + {RECT_CONFIG, "proxy.config.aio.io_uring.wq_workers_unbounded", RECD_INT, "0", RECU_NULL, RR_NULL, RECC_NULL, nullptr, RECA_NULL}, + }; // clang-format on diff --git a/src/traffic_server/InkAPI.cc b/src/traffic_server/InkAPI.cc index f470bbb8d..15ed96267 100644 --- a/src/traffic_server/InkAPI.cc +++ b/src/traffic_server/InkAPI.cc @@ -8588,7 +8588,7 @@ TSAIOWrite(int fd, off_t offset, char *buf, const size_t bufSize, TSCont contp) TSReturnCode TSAIOThreadNumSet(int thread_num) { -#if AIO_MODE == AIO_MODE_NATIVE +#if AIO_MODE == AIO_MODE_NATIVE || AIO_MODE == AIO_MODE_IO_URING (void)thread_num; return TS_SUCCESS; #else diff --git a/src/traffic_server/Makefile.inc b/src/traffic_server/Makefile.inc index caf65f140..c816e3514 100644 --- a/src/traffic_server/Makefile.inc +++ b/src/traffic_server/Makefile.inc @@ -91,6 +91,7 @@ traffic_server_traffic_server_LDADD = \ @OPENSSL_LIBS@ \ @YAMLCPP_LIBS@ \ @BORINGOCSP_LIBS@ \ + @URING_LIBS@ \ -lm if IS_DARWIN diff --git a/src/traffic_server/traffic_server.cc b/src/traffic_server/traffic_server.cc index 7b563eb8f..c5c44fa07 100644 --- a/src/traffic_server/traffic_server.cc +++ b/src/traffic_server/traffic_server.cc @@ -1856,7 +1856,7 @@ main(int /* argc ATS_UNUSED */, const char **argv) // This call is required for win_9xMe // without this this_ethread() is failing when // start_HttpProxyServer is called from main thread - Thread *main_thread = new EThread; + EThread *main_thread = new EThread; main_thread->set_specific(); // Re-initialize diagsConfig based on records.config configuration @@ -2012,6 +2012,14 @@ main(int /* argc ATS_UNUSED */, const char **argv) proxyServerCheck.notify_one(); } +#if TS_USE_LINUX_IO_URING == 1 + Note("Using io_uring for AIO"); + DiskHandler *main_aio = DiskHandler::local_context(); + DiskHandler::set_main_queue(main_aio); + auto [bounded, unbounded] = main_aio->get_wq_max_workers(); + Note("io_uring: WQ workers - bounded = %d, unbounded = %d", bounded, unbounded); +#endif + // !! ET_NET threads start here !! // This means any spawn scheduling must be done before this point. eventProcessor.start(num_of_net_threads, stacksize); @@ -2223,7 +2231,11 @@ main(int /* argc ATS_UNUSED */, const char **argv) TSSystemState::initialization_done(); while (!TSSystemState::is_event_system_shut_down()) { +#if TS_USE_LINUX_IO_URING == 1 + main_aio->submit_and_wait(1000); +#else sleep(1); +#endif } delete main_thread;