Module: sems Branch: master Commit: 5a339944ba72dc1432f58fad3a56f574506b4c74 URL: http://git.sip-router.org/cgi-bin/gitweb.cgi/sems/?a=commit;h=5a339944ba72dc1432f58fad3a56f574506b4c74
Author: Raphael Coeffic <[email protected]> Committer: Raphael Coeffic <[email protected]> Date: Tue Sep 11 10:42:04 2012 +0200 rtcp relay draft (WIP) --- core/AmRtpReceiver.cpp | 2 +- core/AmRtpStream.cpp | 111 +++++++++++++++++++++++++++++++++++++++++------- core/AmRtpStream.h | 13 ++++-- 3 files changed, 105 insertions(+), 21 deletions(-) diff --git a/core/AmRtpReceiver.cpp b/core/AmRtpReceiver.cpp index 1b6ce0b..86b7cae 100644 --- a/core/AmRtpReceiver.cpp +++ b/core/AmRtpReceiver.cpp @@ -136,7 +136,7 @@ void AmRtpReceiverThread::run() streams_mut.lock(); Streams::iterator it = streams.find(tmp_fds[i].fd); if(it != streams.end()) { - it->second.stream->recvPacket(); + it->second.stream->recvPacket(tmp_fds[i].fd); } streams_mut.unlock(); } diff --git a/core/AmRtpStream.cpp b/core/AmRtpStream.cpp index 0af417d..f2a2cea 100644 --- a/core/AmRtpStream.cpp +++ b/core/AmRtpStream.cpp @@ -101,7 +101,7 @@ int AmRtpStream::getLocalSocket() if (l_sd) return l_sd; - int sd=0; + int sd=0, rctp_sd=0; #ifdef SUPPORT_IPV6 if((sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) #else @@ -111,6 +111,17 @@ int AmRtpStream::getLocalSocket() ERROR("%s\n",strerror(errno)); throw string ("while creating new socket."); } + +#ifdef SUPPORT_IPV6 + if((rtcp_sd = socket(l_saddr.ss_family,SOCK_DGRAM,0)) == -1) +#else + if((rtcp_sd = socket(PF_INET,SOCK_DGRAM,0)) == -1) +#endif + { + ERROR("%s\n",strerror(errno)); + throw string ("while creating new RTCP socket."); + } + int true_opt = 1; if(ioctl(sd, FIONBIO , &true_opt) == -1){ ERROR("%s\n",strerror(errno)); @@ -118,7 +129,15 @@ int AmRtpStream::getLocalSocket() throw string ("while setting socket non blocking."); } + if(ioctl(rtcp_sd, FIONBIO , &true_opt) == -1){ + ERROR("%s\n",strerror(errno)); + close(sd); + throw string ("while setting socket non blocking."); + } + l_sd = sd; + l_rtcp_sd = rtcp_sd; + return l_sd; } @@ -127,43 +146,63 @@ void AmRtpStream::setLocalPort() if(l_port) return; - if (!getLocalSocket()) - return; - if(l_if < 0) { l_if = session->dlg.getOutboundIf(); } int retry = 10; - unsigned short port = 0; + unsigned short port = 0, rtcp_port = 0; for(;retry; --retry){ + if (!getLocalSocket()) + return; + port = AmConfig::Ifs[l_if].getNextRtpPort(); #ifdef SUPPORT_IPV6 + set_port_v6(&l_saddr,port+1); + if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr, + sizeof(struct sockaddr_storage))) +#else + l_saddr.sin_port = htons(port+1); + if(bind(l_rtcp_sd,(const struct sockaddr*)&l_saddr, + sizeof(struct sockaddr_in))) +#endif + { + DBG("RTCP bind: %s\n",strerror(errno)); + goto try_another_port; + } + +#ifdef SUPPORT_IPV6 set_port_v6(&l_saddr,port); - if(!bind(l_sd,(const struct sockaddr*)&l_saddr, + if(bind(l_sd,(const struct sockaddr*)&l_saddr, sizeof(struct sockaddr_storage))) #else l_saddr.sin_port = htons(port); - if(!bind(l_sd,(const struct sockaddr*)&l_saddr, + if(bind(l_sd,(const struct sockaddr*)&l_saddr, sizeof(struct sockaddr_in))) #endif - { - break; - } - else { + { DBG("bind: %s\n",strerror(errno)); + goto try_another_port; } + + // both bind() succeeded! + break; + + try_another_port: + close(l_sd); + l_sd = 0; + close(l_rtcp_sd); + l_rtcp_sd = 0; } int true_opt = 1; if (!retry){ ERROR("could not find a free RTP port\n"); - close(l_sd); - l_sd = 0; throw string("could not find a free RTP port"); } + // rco: does that make sense after bind() ???? if(setsockopt(l_sd, SOL_SOCKET, SO_REUSEADDR, (void*)&true_opt, sizeof (true_opt)) == -1) { @@ -174,9 +213,11 @@ void AmRtpStream::setLocalPort() } l_port = port; + l_rtcp_port = port+1; AmRtpReceiver::instance()->addStream(l_sd, this); - DBG("added stream [%p] to RTP receiver (%s:%i)\n", this, - get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port); + AmRtpReceiver::instance()->addStream(l_rtcp_sd, this); + DBG("added stream [%p] to RTP receiver (%s:%i/%i)\n", this, + get_addr_str((sockaddr_storage*)&l_saddr).c_str(),l_port,l_rtcp_port); } int AmRtpStream::ping() @@ -889,8 +930,13 @@ int AmRtpStream::nextPacket(AmRtpPacket*& p) return 1; } -void AmRtpStream::recvPacket() +void AmRtpStream::recvPacket(int fd) { + if(fd == l_rtcp_sd){ + recvRtcpPacket(); + return; + } + AmRtpPacket* p = mem.newPacket(); if (!p) { // drop received data @@ -915,6 +961,39 @@ void AmRtpStream::recvPacket() } } +void AmRtpStream::recvRtcpPacket() +{ + unsigned char buffer[4096]; + + int recved_bytes = recv(l_rtcp_sd,buffer,sizeof(buffer),0); + if(recved_bytes < 0) { + ERROR("rtcp recv(%d): %s",l_rtcp_sd,strerror(errno)); + return; + } + + if(!relay_enabled || !relay_stream || + !relay_stream->l_sd) + return; + + if((size_t)recved_bytes > sizeof(buffer)) { + ERROR("recved huge RTCP packet (%d)",recved_bytes); + return; + } + + struct sockaddr_in rtcp_raddr; + memcpy(&rtcp_raddr,&relay_stream->r_saddr,sizeof(struct sockaddr_in)); + rtcp_raddr.sin_port = htons(relay_stream->l_rtcp_port); + + int err = sendto(relay_stream->l_sd,buffer,recved_bytes,0, + (const struct sockaddr *)&rtcp_raddr, + sizeof(struct sockaddr_in)); + + if(err == -1){ + ERROR("could not relay RTCP packet: %s\n",strerror(errno)); + return; + } +} + void AmRtpStream::relay(AmRtpPacket* p) { if (!l_port) // not yet initialized return; diff --git a/core/AmRtpStream.h b/core/AmRtpStream.h index 9764487..aa0319f 100644 --- a/core/AmRtpStream.h +++ b/core/AmRtpStream.h @@ -205,6 +205,12 @@ protected: /** Local socket */ int l_sd; + /** Local RTCP port */ + unsigned int l_rtcp_port; + + /** Local RTCP socket */ + int l_rtcp_sd; + /** Timestamp of the last received RTP packet */ struct timeval last_recv_time; @@ -257,9 +263,6 @@ protected: /** Payload provider */ AmPayloadProvider* payload_provider; - // get the next available port within configured range - static int getNextPort(); - /** Insert an RTP packet to the buffer queue */ void bufferPacket(AmRtpPacket* p); /* Get next packet from the buffer queue */ @@ -318,7 +321,9 @@ public: int receive( unsigned char* buffer, unsigned int size, unsigned int& ts, int& payload ); - void recvPacket(); + void recvPacket(int fd); + + void recvRtcpPacket(); /** ping the remote side, to open NATs and enable symmetric RTP */ int ping(); _______________________________________________ Semsdev mailing list [email protected] http://lists.iptel.org/mailman/listinfo/semsdev
