hi community,
I ask to test and possibly adopt patch in attach.

on my system this reduces four times the CPU load of AmRtpProcessor
thread (i tested on 200 simultaneous calls)

changes consist in using epoll instead of poll and avoiding search by
hash on every rtp packet.

-- 
br
-Michael Furmur
>From 3743d6d8ea00444412e1f67a759a270775c47236 Mon Sep 17 00:00:00 2001
From: murfur <[email protected]>
Date: Thu, 20 Feb 2014 22:49:56 +0200
Subject: [PATCH] add epoll support for AmRtpReceiver

---
 CMakeLists.txt         |  12 ++++++
 Makefile.defs          |  12 ++++++
 core/AmRtpReceiver.cpp | 100 +++++++++++++++++++++++++++++++++++++++++++++----
 core/AmRtpReceiver.h   |  24 ++++++++++--
 4 files changed, 138 insertions(+), 10 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 2e809a1..3da366d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -45,6 +45,7 @@ OPTION(SEMS_USE_TTS           "Build with Text-to-speech support (requires Flite
 OPTION(SEMS_USE_OPENSSL       "Build with OpenSSL" OFF)
 OPTION(SEMS_USE_MONITORING    "Build with monitoring support" OFF)
 OPTION(SEMS_USE_IPV6          "Build with IPv6 support" OFF)
+OPTION(SEMS_USE_EPOLL         "Build with epoll support" ON)
 OPTION(MAX_RTP_SESSIONS:int   "How many rtp sessions SEMS will handle simultaneously? (default 2048)" 2048)
 
 # Fix weird static libs handling in old CMake
@@ -149,6 +150,17 @@ ELSE(SEMS_USE_IPV6)
 	MESSAGE(STATUS "Enable IPv6 support: NO (default)")
 ENDIF(SEMS_USE_IPV6)
 
+IF(SEMS_USE_EPOLL)
+	IF(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+		ADD_DEFINITIONS(-DUSE_EPOLL)
+		MESSAGE(STATUS "Enable epoll support: YES (default)")
+	ELSE(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+		MESSAGE(STATUS "Enable epoll support: NO (not supported)")
+	ENDIF(${CMAKE_SYSTEM_NAME} MATCHES "Linux")
+ELSE(SEMS_USE_EPOLL)
+	MESSAGE(STATUS "Enable epoll support: NO")
+ENDIF(SEMS_USE_EPOLL)
+
 # Let's try to find GSM library and header files
 FIND_PACKAGE(Gsm)
 IF(GSM_FOUND)
diff --git a/Makefile.defs b/Makefile.defs
index 0131d76..d277857 100644
--- a/Makefile.defs
+++ b/Makefile.defs
@@ -86,6 +86,11 @@ exclude_core_modules = g729 silk
 #
 USE_MONITORING=yes
 
+# build with support for epoll
+#
+#
+USE_EPOLL=yes
+
 # Support for long debug messages? (useful for debugging SIP messages' contents)
 #
 # disable for slight performance gain
@@ -123,6 +128,12 @@ ifdef USE_MONITORING
 CPPFLAGS += -DUSE_MONITORING
 endif
 
+ifdef USE_EPOLL
+ifeq ($(OS), linux)
+CPPFLAGS += -DUSE_EPOLL
+endif
+endif
+
 ifndef LONG_DEBUG_MESSAGE
 CPPFLAGS += -DLOG_BUFFER_LEN=2048
 endif
@@ -343,6 +354,7 @@ export USE_SPANDSP LIBSPANDSP_STATIC LIBSPANDSP_LDIR
 export USE_LIBSAMPLERATE USE_INTERNAL_RESAMPLER
 export WITH_ZRTP
 export USE_MONITORING
+export USE_EPOLL
 export exclude_core_modules exclude_app_modules
 endif  # ifeq ($(makefile_defs, 1)
 
diff --git a/core/AmRtpReceiver.cpp b/core/AmRtpReceiver.cpp
index f890bc1..bdf559f 100644
--- a/core/AmRtpReceiver.cpp
+++ b/core/AmRtpReceiver.cpp
@@ -39,13 +39,21 @@
 #endif
 
 #include <sys/time.h>
+#ifdef USE_EPOLL
+#include <sys/epoll.h>
+#else
 #include <sys/poll.h>
+#endif
 
 #ifndef MAX_RTP_SESSIONS
 #define MAX_RTP_SESSIONS 2048
 #endif 
 
+#ifdef USE_EPOLL
+#define RTP_POLL_TIMEOUT 500 /*500 ms*/
+#else
 #define RTP_POLL_TIMEOUT 50 /*50 ms*/
+#endif
 
 
 _AmRtpReceiver::_AmRtpReceiver()
@@ -61,14 +69,21 @@ _AmRtpReceiver::~_AmRtpReceiver()
 
 AmRtpReceiverThread::AmRtpReceiverThread()
   : stop_requested(false)
+#ifdef USE_EPOLL
+  , poll_fd(-1)
+#endif
 {
+#ifndef USE_EPOLL
   fds  = new struct pollfd[MAX_RTP_SESSIONS];
+#endif
   nfds = 0;
 }
 
 AmRtpReceiverThread::~AmRtpReceiverThread()
 {
+#ifndef USE_EPOLL
   delete [] (fds);
+#endif
   INFO("RTP receiver has been recycled.\n");
 }
 
@@ -97,11 +112,45 @@ void _AmRtpReceiver::dispose()
 
 void AmRtpReceiverThread::run()
 {
+
+#ifdef USE_EPOLL
+  struct epoll_event events[MAX_RTP_SESSIONS];
+
+  poll_fd = epoll_create(MAX_RTP_SESSIONS);
+    if (poll_fd == -1) {
+      throw string("failed epoll_create in AmRtpReceiverThread: "+string(strerror(errno)));
+    }
+#else
   unsigned int   tmp_nfds = 0;
   struct pollfd* tmp_fds  = new struct pollfd[MAX_RTP_SESSIONS];
+#endif
 
   while(!stop_requested.get()){
-	
+#ifdef USE_EPOLL
+    int ret = epoll_wait(poll_fd,events,MAX_RTP_SESSIONS,RTP_POLL_TIMEOUT);
+    if(ret == -1 && errno != EINTR){
+      ERROR("AmRtpReceiver: epoll_wait: %s\n",strerror(errno));
+    }
+    if(ret < 1)
+      continue;
+
+    streams_mut.lock();
+    for (int n = 0; n < ret; ++n) {
+      struct epoll_event &e = events[n];
+      if(!(e.events & EPOLLIN)){
+        continue;
+      }
+      StreamInfo *info = reinterpret_cast<StreamInfo *>(e.data.ptr);
+      AmRtpStream *stream = info->stream;
+      if(!stream){
+        //this descriptor scheduled for removal
+        delete info;
+        continue;
+      }
+      stream->recvPacket(info->fd);
+    }
+    streams_mut.unlock();
+#else
     streams_mut.lock();
     tmp_nfds = nfds;
     memcpy(tmp_fds,fds,nfds*sizeof(struct pollfd));
@@ -117,27 +166,35 @@ void AmRtpReceiverThread::run()
     for(unsigned int i=0; i<tmp_nfds; i++) {
 
       if(!(tmp_fds[i].revents & POLLIN))
-	continue;
+        continue;
 
       streams_mut.lock();
       Streams::iterator it = streams.find(tmp_fds[i].fd);
       if(it != streams.end()) {
-	it->second.stream->recvPacket(tmp_fds[i].fd);
+        it->second.stream->recvPacket(tmp_fds[i].fd);
       }
-      streams_mut.unlock();      
+      streams_mut.unlock();
     }
-  }
+#endif //#ifdef USE_EPOLL
 
+  } //while(!stop_requested.get())
+
+#ifdef USE_EPOLL
+  close(poll_fd);
+#else
   delete[] (tmp_fds);
+#endif
+
 }
 
 void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream)
 {
+
   streams_mut.lock();
 
   if(streams.find(sd) != streams.end()) {
     ERROR("trying to insert existing stream [%p] with sd=%i\n",
-	  stream,sd);
+    stream,sd);
     streams_mut.unlock();
     return;
   }
@@ -145,15 +202,34 @@ void AmRtpReceiverThread::addStream(int sd, AmRtpStream* stream)
   if(nfds >= MAX_RTP_SESSIONS){
     streams_mut.unlock();
     ERROR("maximum number of sessions reached (%i)\n",
-	  MAX_RTP_SESSIONS);
+    MAX_RTP_SESSIONS);
     throw string("maximum number of sessions reached");
   }
 
+#ifdef USE_EPOLL
+  struct epoll_event ev;
+
+  std::pair<Streams::iterator, bool> s_it = streams.insert(std::make_pair(sd,new StreamInfo(sd,stream)));
+
+  ev.events = EPOLLIN;
+  ev.data.ptr = s_it.first->second;
+
+  if(epoll_ctl(poll_fd,EPOLL_CTL_ADD,sd,&ev)==-1){
+    delete s_it.first->second;
+    streams.erase(s_it.first);
+    streams_mut.unlock();
+    ERROR("failed to add to epoll structure stream [%p] with sd=%i, error: %s\n",
+        stream,sd,strerror(errno));
+    return;
+  }
+#else
   fds[nfds].fd      = sd;
   fds[nfds].events  = POLLIN;
   fds[nfds].revents = 0;
 
   streams.insert(std::make_pair(sd,StreamInfo(nfds,stream)));
+#endif
+
   nfds++;
 
   streams_mut.unlock();
@@ -169,6 +245,15 @@ void AmRtpReceiverThread::removeStream(int sd)
     return;
   }
 
+#ifdef USE_EPOLL
+  if(epoll_ctl(poll_fd,EPOLL_CTL_DEL,sd,NULL)==-1){
+      ERROR("removeStream epoll_ctl_del stream [%p] with sd = %i error %s",
+            sit->second->stream, sd, strerror(errno));
+  }
+  sit->second->stream = NULL; //schedule for removal
+  streams.erase(sit);
+  --nfds;
+#else
   unsigned int i = sit->second.index;
   if(--nfds && (i < nfds)) {
     fds[i] = fds[nfds];
@@ -179,6 +264,7 @@ void AmRtpReceiverThread::removeStream(int sd)
   }
   streams.erase(sd);
 
+#endif
   streams_mut.unlock();
 }
 
diff --git a/core/AmRtpReceiver.h b/core/AmRtpReceiver.h
index 0d0d511..dd1cd0e 100644
--- a/core/AmRtpReceiver.h
+++ b/core/AmRtpReceiver.h
@@ -32,7 +32,11 @@
 #include "atomic_types.h"
 #include "singleton.h"
 
+#ifdef USE_EPOLL
+#include <sys/epoll.h>
+#else
 #include <sys/select.h>
+#endif
 
 #include <map>
 using std::greater;
@@ -49,6 +53,16 @@ class _AmRtpReceiver;
  */
 class AmRtpReceiverThread: public AmThread {
 
+#ifdef USE_EPOLL
+  struct StreamInfo {
+    int fd;
+    AmRtpStream* stream;
+
+    StreamInfo(int f, AmRtpStream* s)
+      : fd(f), stream(s) {}
+  };
+  typedef std::map<int, StreamInfo *, greater<int> > Streams;
+#else
   struct StreamInfo {
     unsigned int index; // index into fds table
     AmRtpStream* stream;
@@ -56,15 +70,19 @@ class AmRtpReceiverThread: public AmThread {
     StreamInfo(unsigned int i, AmRtpStream* s)
       : index(i), stream(s) {}
   };
-
   typedef std::map<int, StreamInfo, greater<int> > Streams;
+#endif
 
   Streams  streams;
   AmMutex  streams_mut;
 
+#ifdef USE_EPOLL
+  int poll_fd;
+#else
   struct pollfd* fds;
-  unsigned int   nfds;
-    
+#endif
+  unsigned int nfds;
+
   AmRtpReceiverThread();
   ~AmRtpReceiverThread();
     
-- 
1.8.5.3

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to