Module: sems
Branch: master
Commit: 5a339944ba72dc1432f58fad3a56f574506b4c74
URL:    
http://git.sip-router.org/cgi-bin/gitweb.cgi/sems/?a=commit;h=5a339944ba72dc1432f58fad3a56f574506b4c74

Author: Raphael Coeffic <[email protected]>
Committer: Raphael Coeffic <[email protected]>
Date:   Tue Sep 11 10:42:04 2012 +0200

rtcp relay draft (WIP)

---

 core/AmRtpReceiver.cpp |    2 +-
 core/AmRtpStream.cpp   |  111 +++++++++++++++++++++++++++++++++++++++++-------
 core/AmRtpStream.h     |   13 ++++--
 3 files changed, 105 insertions(+), 21 deletions(-)

diff --git a/core/AmRtpReceiver.cpp b/core/AmRtpReceiver.cpp
index 1b6ce0b..86b7cae 100644
--- a/core/AmRtpReceiver.cpp
+++ b/core/AmRtpReceiver.cpp
@@ -136,7 +136,7 @@ void AmRtpReceiverThread::run()
       streams_mut.lock();
       Streams::iterator it = streams.find(tmp_fds[i].fd);
       if(it != streams.end()) {
-       it->second.stream->recvPacket();
+       it->second.stream->recvPacket(tmp_fds[i].fd);
       }
       streams_mut.unlock();      
     }
diff --git a/core/AmRtpStream.cpp b/core/AmRtpStream.cpp
index 0af417d..f2a2cea 100644
--- a/core/AmRtpStream.cpp
+++ b/core/AmRtpStream.cpp
@@ -101,7 +101,7 @@ int AmRtpStream::getLocalSocket()
   if (l_sd)
     return l_sd;
 
-  int sd=0;
+  int sd=0, rctp_sd=0;
 #ifdef SUPPORT_IPV6
   if((sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1)
 #else
@@ -111,6 +111,17 @@ int AmRtpStream::getLocalSocket()
        ERROR("%s\n",strerror(errno));
        throw string ("while creating new socket.");
       } 
+
+#ifdef SUPPORT_IPV6
+  if((rtcp_sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1)
+#else
+    if((rtcp_sd = socket(PF_INET,SOCK_DGRAM,0)) == -1)
+#endif
+      {
+       ERROR("%s\n",strerror(errno));
+       throw string ("while creating new RTCP socket.");
+      } 
+
   int true_opt = 1;
   if(ioctl(sd, FIONBIO , &true_opt) == -1){
     ERROR("%s\n",strerror(errno));
@@ -118,7 +129,15 @@ int AmRtpStream::getLocalSocket()
     throw string ("while setting socket non blocking.");
   }
 
+  if(ioctl(rtcp_sd, FIONBIO , &true_opt) == -1){
+    ERROR("%s\n",strerror(errno));
+    close(sd);
+    throw string ("while setting socket non blocking.");
+  }
+
   l_sd = sd;
+  l_rtcp_sd = rtcp_sd;
+
   return l_sd;
 }
 
@@ -127,43 +146,63 @@ void AmRtpStream::setLocalPort()
   if(l_port)
     return;
   
-  if (!getLocalSocket())
-    return;
-
   if(l_if < 0) {
     l_if = session->dlg.getOutboundIf();
   }
   
   int retry = 10;
-  unsigned short port = 0;
+  unsigned short port = 0, rtcp_port = 0;
   for(;retry; --retry){
 
+    if (!getLocalSocket())
+      return;
+
     port = AmConfig::Ifs[l_if].getNextRtpPort();
 #ifdef SUPPORT_IPV6
+    set_port_v6(&l_saddr,port+1);
+    if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr,
+            sizeof(struct sockaddr_storage)))
+#else  
+    l_saddr.sin_port = htons(port+1);
+    if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr,
+            sizeof(struct sockaddr_in)))
+#endif
+    {
+      DBG("RTCP bind: %s\n",strerror(errno));
+      goto try_another_port;
+    }
+
+#ifdef SUPPORT_IPV6
     set_port_v6(&l_saddr,port);
-    if(!bind(l_sd,(const struct sockaddr*)&l_saddr,
+    if(bind(l_sd,(const struct sockaddr*)&l_saddr,
             sizeof(struct sockaddr_storage)))
 #else  
     l_saddr.sin_port = htons(port);
-    if(!bind(l_sd,(const struct sockaddr*)&l_saddr,
+    if(bind(l_sd,(const struct sockaddr*)&l_saddr,
             sizeof(struct sockaddr_in)))
 #endif
-      {
-       break;
-      }
-    else {
+    {
       DBG("bind: %s\n",strerror(errno));               
+      goto try_another_port;
     }
+
+    // both bind() succeeded!
+    break;
+
+  try_another_port:
+      close(l_sd);
+      l_sd = 0;
+      close(l_rtcp_sd);
+      l_rtcp_sd = 0;
   }
 
   int true_opt = 1;
   if (!retry){
     ERROR("could not find a free RTP port\n");
-    close(l_sd);
-    l_sd = 0;
     throw string("could not find a free RTP port");
   }
 
+  // rco: does that make sense after bind() ????
   if(setsockopt(l_sd, SOL_SOCKET, SO_REUSEADDR,
                (void*)&true_opt, sizeof (true_opt)) == -1) {
 
@@ -174,9 +213,11 @@ void AmRtpStream::setLocalPort()
   }
 
   l_port = port;
+  l_rtcp_port = port+1;
   AmRtpReceiver::instance()->addStream(l_sd, this);
-  DBG("added stream [%p] to RTP receiver (%s:%i)\n", this,
-      get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port);
+  AmRtpReceiver::instance()->addStream(l_rtcp_sd, this);
+  DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this,
+      get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port);
 }
 
 int AmRtpStream::ping()
@@ -889,8 +930,13 @@ int AmRtpStream::nextPacket(AmRtpPacket*& p)
   return 1;
 }
 
-void AmRtpStream::recvPacket()
+void AmRtpStream::recvPacket(int fd)
 {
+  if(fd == l_rtcp_sd){
+    recvRtcpPacket();
+    return;
+  }
+
   AmRtpPacket* p = mem.newPacket();
   if (!p) {
     // drop received data
@@ -915,6 +961,39 @@ void AmRtpStream::recvPacket()
   }
 }
 
+void AmRtpStream::recvRtcpPacket()
+{
+  unsigned char buffer[4096];
+
+  int recved_bytes = recv(l_rtcp_sd,buffer,sizeof(buffer),0);
+  if(recved_bytes < 0) {
+    ERROR("rtcp recv(%d): %s",l_rtcp_sd,strerror(errno));
+    return;
+  }
+
+  if(!relay_enabled || !relay_stream ||
+     !relay_stream->l_sd)
+    return;
+
+  if((size_t)recved_bytes > sizeof(buffer)) {
+    ERROR("recved huge RTCP packet (%d)",recved_bytes);
+    return;
+  }
+
+  struct sockaddr_in rtcp_raddr;
+  memcpy(&rtcp_raddr,&relay_stream->r_saddr,sizeof(struct sockaddr_in));
+  rtcp_raddr.sin_port = htons(relay_stream->l_rtcp_port);
+
+  int err = sendto(relay_stream->l_sd,buffer,recved_bytes,0,
+              (const struct sockaddr *)&rtcp_raddr,
+              sizeof(struct sockaddr_in));
+  
+  if(err == -1){
+    ERROR("could not relay RTCP packet: %s\n",strerror(errno));
+    return;
+  }
+}
+
 void AmRtpStream::relay(AmRtpPacket* p) {
   if (!l_port) // not yet initialized
     return;
diff --git a/core/AmRtpStream.h b/core/AmRtpStream.h
index 9764487..aa0319f 100644
--- a/core/AmRtpStream.h
+++ b/core/AmRtpStream.h
@@ -205,6 +205,12 @@ protected:
   /** Local socket */
   int                l_sd;
 
+  /** Local RTCP port */
+  unsigned int l_rtcp_port;
+
+  /** Local RTCP socket */
+  int          l_rtcp_sd;
+
   /** Timestamp of the last received RTP packet */
   struct timeval last_recv_time;
 
@@ -257,9 +263,6 @@ protected:
   /** Payload provider */
   AmPayloadProvider* payload_provider;
 
-  // get the next available port within configured range
-  static int getNextPort();
-
   /** Insert an RTP packet to the buffer queue */
   void bufferPacket(AmRtpPacket* p);
   /* Get next packet from the buffer queue */
@@ -318,7 +321,9 @@ public:
   int receive( unsigned char* buffer, unsigned int size,
               unsigned int& ts, int& payload );
 
-  void recvPacket();
+  void recvPacket(int fd);
+
+  void recvRtcpPacket();
 
   /** ping the remote side, to open NATs and enable symmetric RTP */
   int ping();

_______________________________________________
Semsdev mailing list
[email protected]
http://lists.iptel.org/mailman/listinfo/semsdev

Reply via email to