hi community, I ask to test and possibly adopt patch in attach. on my system this reduces four times the CPU load of AmRtpProcessor thread (i tested on 200 simultaneous calls)
changes consist in using epoll instead of poll and avoiding search by hash on every rtp packet. -- br -Michael Furmur
>From 3743d6d8ea00444412e1f67a759a270775c47236 Mon Sep 17 00:00:00 2001 From: murfur <[email protected]> Date: Thu, 20 Feb 2014 22:49:56 +0200 Subject: [PATCH] add epoll support for AmRtpReceiver --- CMakeLists.txt | 12 ++++++ Makefile.defs | 12 ++++++ core/AmRtpReceiver.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++++---- core/AmRtpReceiver.h | 24 ++++++++++-- 4 files changed, 138 insertions(+), 10 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 2e809a1..3da366d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ OPTION(SEMS_USE_TTS "Build with Text-to-speech support (requires Flite OPTION(SEMS_USE_OPENSSL "Build with OpenSSL" OFF) OPTION(SEMS_USE_MONITORING "Build with monitoring support" OFF) OPTION(SEMS_USE_IPV6 "Build with IPv6 support" OFF) +OPTION(SEMS_USE_EPOLL "Build with epoll support" ON) OPTION(MAX_RTP_SESSIONS:int "How many rtp sessions SEMS will handle simultaneously? (default 2048)" 2048) # Fix weird static libs handling in old CMake @@ -149,6 +150,17 @@ ELSE(SEMS_USE_IPV6) MESSAGE(STATUS "Enable IPv6 support: NO (default)") ENDIF(SEMS_USE_IPV6) +IF(SEMS_USE_EPOLL) + IF(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + ADD_DEFINITIONS(-DUSE_EPOLL) + MESSAGE(STATUS "Enable epoll support: YES (default)") + ELSE(${CMAKE_SYSTEM_NAME} MATCHES "Linux") + MESSAGE(STATUS "Enable epoll support: NO (not supported)") + ENDIF(${CMAKE_SYSTEM_NAME} MATCHES "Linux") +ELSE(SEMS_USE_EPOLL) + MESSAGE(STATUS "Enable epoll support: NO") +ENDIF(SEMS_USE_EPOLL) + # Let's try to find GSM library and header files FIND_PACKAGE(Gsm) IF(GSM_FOUND) diff --git a/Makefile.defs b/Makefile.defs index 0131d76..d277857 100644 --- a/Makefile.defs +++ b/Makefile.defs @@ -86,6 +86,11 @@ exclude_core_modules = g729 silk # USE_MONITORING=yes +# build with support for epoll +# +# +USE_EPOLL=yes + # Support for long debug messages? (useful for debugging SIP messages' contents) # # disable for slight performance gain @@ -123,6 +128,12 @@ ifdef USE_MONITORING CPPFLAGS += -DUSE_MONITORING endif +ifdef USE_EPOLL +ifeq ($(OS), linux) +CPPFLAGS += -DUSE_EPOLL +endif +endif + ifndef LONG_DEBUG_MESSAGE CPPFLAGS += -DLOG_BUFFER_LEN=2048 endif @@ -343,6 +354,7 @@ export USE_SPANDSP LIBSPANDSP_STATIC LIBSPANDSP_LDIR export USE_LIBSAMPLERATE USE_INTERNAL_RESAMPLER export WITH_ZRTP export USE_MONITORING +export USE_EPOLL export exclude_core_modules exclude_app_modules endif # ifeq ($(makefile_defs, 1) diff --git a/core/AmRtpReceiver.cpp b/core/AmRtpReceiver.cpp index f890bc1..bdf559f 100644 --- a/core/AmRtpReceiver.cpp +++ b/core/AmRtpReceiver.cpp @@ -39,13 +39,21 @@ #endif #include <sys/time.h> +#ifdef USE_EPOLL +#include <sys/epoll.h> +#else #include <sys/poll.h> +#endif #ifndef MAX_RTP_SESSIONS #define MAX_RTP_SESSIONS 2048 #endif +#ifdef USE_EPOLL +#define RTP_POLL_TIMEOUT 500 /*500 ms*/ +#else #define RTP_POLL_TIMEOUT 50 /*50 ms*/ +#endif _AmRtpReceiver::_AmRtpReceiver() @@ -61,14 +69,21 @@ _AmRtpReceiver::~_AmRtpReceiver() AmRtpReceiverThread::AmRtpReceiverThread() : stop_requested(false) +#ifdef USE_EPOLL + , poll_fd(-1) +#endif { +#ifndef USE_EPOLL fds = new struct pollfd[MAX_RTP_SESSIONS]; +#endif nfds = 0; } AmRtpReceiverThread::~AmRtpReceiverThread() { +#ifndef USE_EPOLL delete [] (fds); +#endif INFO("RTP receiver has been recycled.\n"); } @@ -97,11 +112,45 @@ void _AmRtpReceiver::dispose() void AmRtpReceiverThread::run() { + +#ifdef USE_EPOLL + struct epoll_event events[MAX_RTP_SESSIONS]; + + poll_fd = epoll_create(MAX_RTP_SESSIONS); + if (poll_fd == -1) { + throw string("failed epoll_create in AmRtpReceiverThread: "+string(strerror(errno))); + } +#else unsigned int tmp_nfds = 0; struct pollfd* tmp_fds = new struct pollfd[MAX_RTP_SESSIONS]; +#endif while(!stop_requested.get()){ - +#ifdef USE_EPOLL + int ret = epoll_wait(poll_fd,events,MAX_RTP_SESSIONS,RTP_POLL_TIMEOUT); + if(ret == -1 && errno != EINTR){ + ERROR("AmRtpReceiver: epoll_wait: %s\n",strerror(errno)); + } + if(ret < 1) + continue; + + streams_mut.lock(); + for (int n = 0; n < ret; ++n) { + struct epoll_event &e = events[n]; + if(!(e.events & EPOLLIN)){ + continue; + } + StreamInfo *info = reinterpret_cast<StreamInfo *>(e.data.ptr); + AmRtpStream *stream = info->stream; + if(!stream){ + //this descriptor scheduled for removal + delete info; + continue; + } + stream->recvPacket(info->fd); + } + streams_mut.unlock(); +#else streams_mut.lock(); tmp_nfds = nfds; memcpy(tmp_fds,fds,nfds*sizeof(struct pollfd)); @@ -117,27 +166,35 @@ void AmRtpReceiverThread::run() for(unsigned int i=0; i<tmp_nfds; i++) { if(!(tmp_fds[i].revents & POLLIN)) - continue; + continue; streams_mut.lock(); Streams::iterator it = streams.find(tmp_fds[i].fd); if(it != streams.end()) { - it->second.stream->recvPacket(tmp_fds[i].fd); + it->second.stream->recvPacket(tmp_fds[i].fd); } - streams_mut.unlock(); + streams_mut.unlock(); } - } +#endif //#ifdef USE_EPOLL + } //while(!stop_requested.get()) + +#ifdef USE_EPOLL + close(poll_fd); +#else delete[] (tmp_fds); +#endif + } void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream) { + streams_mut.lock(); if(streams.find(sd) != streams.end()) { ERROR("trying to insert existing stream [%p] with sd=%i\n", - stream,sd); + stream,sd); streams_mut.unlock(); return; } @@ -145,15 +202,34 @@ void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream) if(nfds >= MAX_RTP_SESSIONS){ streams_mut.unlock(); ERROR("maximum number of sessions reached (%i)\n", - MAX_RTP_SESSIONS); + MAX_RTP_SESSIONS); throw string("maximum number of sessions reached"); } +#ifdef USE_EPOLL + struct epoll_event ev; + + std::pair<Streams::iterator, bool> s_it = streams.insert(std::make_pair(sd,new StreamInfo(sd,stream))); + + ev.events = EPOLLIN; + ev.data.ptr = s_it.first->second; + + if(epoll_ctl(poll_fd,EPOLL_CTL_ADD,sd,&ev)==-1){ + delete s_it.first->second; + streams.erase(s_it.first); + streams_mut.unlock(); + ERROR("failed to add to epoll structure stream [%p] with sd=%i, error: %s\n", + stream,sd,strerror(errno)); + return; + } +#else fds[nfds].fd = sd; fds[nfds].events = POLLIN; fds[nfds].revents = 0; streams.insert(std::make_pair(sd,StreamInfo(nfds,stream))); +#endif + nfds++; streams_mut.unlock(); @@ -169,6 +245,15 @@ void AmRtpReceiverThread::removeStream(int sd) return; } +#ifdef USE_EPOLL + if(epoll_ctl(poll_fd,EPOLL_CTL_DEL,sd,NULL)==-1){ + ERROR("removeStream epoll_ctl_del stream [%p] with sd = %i error %s", + sit->second->stream, sd, strerror(errno)); + } + sit->second->stream = NULL; //schedule for removal + streams.erase(sit); + --nfds; +#else unsigned int i = sit->second.index; if(--nfds && (i < nfds)) { fds[i] = fds[nfds]; @@ -179,6 +264,7 @@ void AmRtpReceiverThread::removeStream(int sd) } streams.erase(sd); +#endif streams_mut.unlock(); } diff --git a/core/AmRtpReceiver.h b/core/AmRtpReceiver.h index 0d0d511..dd1cd0e 100644 --- a/core/AmRtpReceiver.h +++ b/core/AmRtpReceiver.h @@ -32,7 +32,11 @@ #include "atomic_types.h" #include "singleton.h" +#ifdef USE_EPOLL +#include <sys/epoll.h> +#else #include <sys/select.h> +#endif #include <map> using std::greater; @@ -49,6 +53,16 @@ class _AmRtpReceiver; */ class AmRtpReceiverThread: public AmThread { +#ifdef USE_EPOLL + struct StreamInfo { + int fd; + AmRtpStream* stream; + + StreamInfo(int f, AmRtpStream* s) + : fd(f), stream(s) {} + }; + typedef std::map<int, StreamInfo *, greater<int> > Streams; +#else struct StreamInfo { unsigned int index; // index into fds table AmRtpStream* stream; @@ -56,15 +70,19 @@ class AmRtpReceiverThread: public AmThread { StreamInfo(unsigned int i, AmRtpStream* s) : index(i), stream(s) {} }; - typedef std::map<int, StreamInfo, greater<int> > Streams; +#endif Streams streams; AmMutex streams_mut; +#ifdef USE_EPOLL + int poll_fd; +#else struct pollfd* fds; - unsigned int nfds; - +#endif + unsigned int nfds; + AmRtpReceiverThread(); ~AmRtpReceiverThread(); -- 1.8.5.3
_______________________________________________ Semsdev mailing list [email protected] http://lists.iptel.org/mailman/listinfo/semsdev
