Hi,

I was having a few issues with mediaproxy when it came to re-invites and
call setup where the SDP does not get sent with the first INVITE. I
noticed this was due a couple couple of assumptions in mediaproxy which
don't always hold:

1) That SDP is always set up first by the caller (i.e. in the INVITE)
then updated by the called party (i.e. in the 200). This is not the
case. The first SDP can be sent in the 200 response by the called party
and the caller send their SDP in the ACK.
2) That RTP streams don't change once set up. This is wrong for reINVITEs.

This would cause problems when a user agent sent a reINVITE to redirect
RTP to on hold music for example - the RTP change would be ignored. I
think this had an impact on some fax setups as well, but I haven't
tested it. I found a patch to fix this by storing the streams in a hash
by media type, but that is a limiting assumption also (maybe I want 2
audi streams?).

The attached patch fixes this by changing the following:

1) This pathc allows a mediaproxy RTPSession to be set up by either the
caller or called party. A 'caller' argument is passed to the RTPSession
functions telling it if the caller or called party is doing the setup or
update.

2) The official mediaproxy does not even look at SDP details for streams
which are already set up. This patch lets every SDP packet update all of
the RTP streams for that party (either caller or called).

If you have issues with one-way-audio with media proxy after reINVITEs,
have calls the first SDP from the called party or are just plain
adventurous, please give it a go. It has been used in a production
environment for a bit over a month now. The patch was made against
mediaproxy 1.7.2, but will work against 1.8.0 as well since none of the
rtphander.py code changed.

Regards,
Jeff
diff -ur mediaproxy-1.7.2/modules/rtphandler.py mediaproxy-1.7.2-jeffw3/modules/rtphandler.py
--- mediaproxy-1.7.2/modules/rtphandler.py	2006-04-10 01:38:42.000000000 +0800
+++ mediaproxy-1.7.2-jeffw3/modules/rtphandler.py	2007-02-08 22:14:40.000000000 +0900
@@ -502,15 +502,18 @@
             return "%s %s/%s/%s" % ((len(Sessions),) + tuple(Traffic))
         elif cmd == 'version':
             return release
-        elif cmd == 'request':
-            sessionId = request.sessionId
+        elif cmd == 'request' or cmd == 'lookup':
+            # callers are request, called are lookup
+            caller = (cmd == 'request')
+
             try:
-                session = Sessions[sessionId]
+                session = Sessions[request.sessionId]
             except KeyError:
-                if request.info.totag:
-                    ## Don't create a new session on re-INVITEs
-                    ## (we may decided not to use mediaproxy on the 1st INVITE)
-                    return None
+                #if request.info.totag:
+                #    ## Don't create a new session on re-INVITEs
+                #    ## (we may decided not to use mediaproxy on the 1st INVITE)
+                #    print "request.info.totag"
+                #    return None
                 ## Create a brand new session
                 # todo: embed logic of encoding/decoding address to/from string into StructureInfo -Dan
                 if request.info.dispatcher is not None:
@@ -528,28 +531,25 @@
                         else:
                             request.info.dispatcher = address
                 try:
-                    session = RTPSession(request)
+                    session = RTPSession(request, caller)
                 except RTPSessionError, why:
-                    raise InvalidRequestError, "cannot create session for '%s': %s" % (sessionId, why)
+                    raise InvalidRequestError, "cannot create session for '%s': %s" % (request.sessionId, why)
             else:
-                session.updateStreams(request)
+                session.updateStreams(request, caller)
+
+            # Mmmmm, I'm a bit dubious about this section...
+            # I think the on hold status show be per RTP stream
             ips = [address.split(':')[0] for address in request.streams.split(',')]
             ## A session can be put on hold only after both parties have sent their
             ## contact data. However they don't have to sign in on RTP before
             ## they can put the call on hold (they may have silence detection)
-            if session.complete and '0.0.0.0' in ips:
+            if session.callerComplete and session.calledComplete and '0.0.0.0' in ips:
                 session.onHold  = True
                 session.timeout = ProxyConfig.holdTimeout
             else:
                 session.onHold  = False
                 session.timeout = ProxyConfig.idleTimeout
-            return session.endpointAddresses()
-        elif cmd == 'lookup':
-            try:
-                session = Sessions[request.sessionId]
-            except KeyError:
-                return None
-            session.fillInCalledParty(request)
+
             # remove (or make it show in debug mode only)
             #for stream in session.mediaStreams:
                 #stream.rtpStream.caller.show('rtp', compact=1)
@@ -558,6 +558,7 @@
                 #stream.rtcpStream.called.show('rtcp', compact=1)
             # until here
             return session.endpointAddresses()
+
         elif cmd == 'delete':
             try:
                 session = Sessions[request.sessionId]
@@ -629,6 +630,13 @@
         self.__v.updateContacts()
         self.name = name
 
+    def update(self, contactIP=None, rtpPort=0, visibleIP=None, local=0, asymmetric=0):
+        self.contactIP = contactIP
+        self.rtpPort = rtpPort
+        self.visibleIP = visibleIP
+        self.local = local
+        self.asymmetric = asymmetric
+
     def getci(self): return self.__v.contactIP
     def getvi(self): return self.__v.visibleIP
     def getei(self): return self.__v.expectedIP
@@ -753,6 +761,7 @@
         bytes = len(data) + 28 ## 28 is from IP+UDP headers
         caller = self.caller
         called = self.called
+
         if address == caller.signedInAddress:
             sender = 0
             destination = called.address
@@ -791,6 +800,14 @@
             self.signIn(called, address, data)
             sender = 1
             destination = caller.address
+        elif caller.signedInAddress and address[0]==caller.visibleIP and address[1]==caller.rtpPort:
+            self.signIn(caller, address, data)
+            sender = 0
+            destination = called.address
+        elif called.signedInAddress and address[0]==called.visibleIP and address[1]==called.rtpPort:
+            self.signIn(called, address, data)
+            sender = 1
+            destination = caller.address
         else:
             ## Allow called party to change address (for example after a 183 from
             ## PSTN, if the call is not answered it is diverted to an announcement
@@ -875,10 +892,14 @@
         self.rtcpStream.close()
 
     def setCaller(self, address, visibleIP, asymmetric):
-        if not self.rtpStream.caller:
-            ip = address[0]
-            port = int(address[1] or 0)
-            local = self.session.localCaller
+        ip = address[0]
+        port = int(address[1] or 0)
+        local = self.session.localCaller
+        if self.rtpStream.caller:
+            # update the RTPPeer
+            self.rtpStream.caller.update(ip, port,   visibleIP, local, asymmetric)
+            self.rtcpStream.caller.update(ip, port+1, visibleIP, local, asymmetric)
+        else:
             rtpStream  = self.rtpStream
             rtcpStream = self.rtcpStream
             rtpStream.caller  = RTPPeer('caller', ip, port,   visibleIP, local, asymmetric)
@@ -886,10 +907,14 @@
             self.complete = rtpStream.called is not None
 
     def setCalled(self, address, visibleIP, asymmetric):
-        if not self.rtpStream.called:
-            ip = address[0]
-            port = int(address[1] or 0)
-            local = self.session.localCalled
+        ip = address[0]
+        port = int(address[1] or 0)
+        local = self.session.localCalled
+        if self.rtpStream.called:
+            # update the RTPPeer
+            self.rtpStream.called.update(ip, port,   visibleIP, local, asymmetric)
+            self.rtcpStream.called.update(ip, port+1, visibleIP, local, asymmetric)
+        else:
             rtpStream  = self.rtpStream
             rtcpStream = self.rtcpStream
             rtpStream.called  = RTPPeer('called', ip, port,   visibleIP, local, asymmetric)
@@ -951,52 +976,41 @@
 
 
 class RTPSession(object):
-    def __init__(self, request):
+    # Either the caller or called can initialise the RTPSession first
+    # The order can go:
+    # INVITE + SDP -> 200 + SDP -> ACK (Most common, caller initialises RTPSession)
+    # INVITE -> 200 + SDP -> ACK + SDP (Still valid, called initialises RTPSession)
+    def __init__(self, request, caller):
         global Sessions
 
+        # These can be initialise by either end of the call
         self._id = request.sessionId
         self.callerDomain = request.callerDomain
         self.calledDomain = request.calledDomain
         self.localCaller  = request.localCaller
         self.localCalled  = request.localCalled
-        self.userAgents = [request.userAgent.decode('quopri'), 'Unknown']
-        self.encodedAgents = [request.userAgent, 'Unknown']
+        self.startTime = round(time.time())
+        self.conversationStart = None
+        self.timeout = ProxyConfig.idleTimeout
+        # onHold should probably be a per mediaStream setting...
+        self.onHold = False
+        self.forceClosed = False
+        self.callerComplete = False
+        self.calledComplete = False
+        self.ended = False
+        # These get initialise in updateStreams
+        self.userAgents = ['Unknown', 'Unknown']
+        self.encodedAgents = ['Unknown', 'Unknown']
         self.dispatcher = request.info.dispatcher
         self.callerInfo = request.info
         ## We need this info until the called party signs in, else if a CANCEL
         ## from the caller comes before the OK it will raise an exception because
         ## there is no calledInfo to compute the stats when the session ends
         self.calledInfo = request.info ## eventually make a copy of it
-        visibleIP = request.signalingIP
-        asymmetric = request.info.flags.asymmetric
-        streams = [s.split(':') for s in request.streams.split(',')]
         self.mediaStreams = []
-        for sinfo in streams:
-            try:
-                mediatype = sinfo[2]
-            except IndexError:
-                mediatype = 'Audio'
-                # remove at a later time
-                print("warning: SER is using an old mediaproxy.so module. "
-                      "Please upgrade it to get correct media type information.")
-            try:
-                stream = MediaStream(self, mediatype)
-            except MediaStreamError:
-                for stream in self.mediaStreams:
-                    stream.close()
-                raise RTPSessionError, "cannot find enough free ports for the media streams"
-            stream.setCaller(sinfo, visibleIP, asymmetric)
-            self.mediaStreams.append(stream)
+        self.updateStreams(request, caller)
         if not self.mediaStreams:
             raise RTPSessionError, "there are no media streams to process"
-        self.startTime = round(time.time())
-        self.conversationStart = None
-        self.timeout = ProxyConfig.idleTimeout
-        self.onHold = False
-        self.forceClosed = False
-        self.complete = 0 ## Will need the destination filled in as well
-        self.needStreamUpdate = 0 ## If new streams are added on the fly
-        self.ended = False
 
         Sessions[self._id] = self
         adr = self.endpointAddresses().split()
@@ -1066,58 +1080,64 @@
     del(getit, getdt, getco, getty, getta, getdu)
     del(getsu, getby, getpa, getcd, getsl, getin)
 
-    def updateStreams(self, request):
+    def updateStreams(self, request, caller):
         streams = [s.split(':') for s in request.streams.split(',')]
-        streamCount = len(self.mediaStreams)
-        if streamCount >= len(streams):
-            return
         visibleIP = request.signalingIP
-        asymmetric = self.callerInfo.flags.asymmetric
-        for sinfo in streams[streamCount:]:
+        asymmetric = None
+
+        # initialise the caller or called specific settings
+        if caller:
+            asymmetric = self.callerInfo.flags.asymmetric
+            if not self.callerComplete:
+                self.userAgents[0] = request.userAgent.decode('quopri')
+                self.encodedAgents[0] = request.userAgent
+                self.callerInfo = request.info
+                self.conversationStart = round(time.time())
+                self.callerComplete = True
+        else:
+            asymmetric = self.calledInfo.flags.asymmetric
+            if not self.calledComplete:
+                self.userAgents[1] = request.userAgent.decode('quopri')
+                self.encodedAgents[1] = request.userAgent
+                self.calledInfo = request.info
+                self.conversationStart = round(time.time())
+                self.calledComplete = True
+
+        # initialise the mediaStreams
+        for i in range(len(streams)):
+            sinfo = streams[i]
             try:
-                mediatype = sinfo[2]
+                stream = self.mediaStreams[i]
+                # update the stream
+                if caller:
+                    stream.setCaller(sinfo, visibleIP, asymmetric)
+                else:
+                    stream.setCalled(sinfo, visibleIP, asymmetric)
             except IndexError:
-                mediatype = 'Audio'
-                # remove at a later time
-                print("warning: SER is using an old mediaproxy.so module. "
-                      "Please upgrade it to get correct media type information.")
-            try:
-                stream = MediaStream(self, mediatype)
-            except MediaStreamError:
-                # check this. also at session creation
-                #for stream in self.mediaStreams:
-                    #stream.close()
-                raise RTPSessionError, "cannot find enough free ports for the media streams"
-            stream.setCaller(sinfo, visibleIP, asymmetric)
-            self.mediaStreams.append(stream)
-        self.oldCount = streamCount
-        self.needStreamUpdate = 1
+                # add a new stream
+                try:
+                    mediatype = sinfo[2]
+                except IndexError:
+                    mediatype = 'Audio'
+                    # remove at a later time
+                    print("warning: SER is using an old mediaproxy.so module. "
+                          "Please upgrade it to get correct media type information.")
+                try:
+                    stream = MediaStream(self, mediatype)
+                except MediaStreamError:
+                    # check this. also at session creation
+                    #for stream in self.mediaStreams:
+                        #stream.close()
+                    raise RTPSessionError, "cannot find enough free ports for the media streams"
+                if caller:
+                    stream.setCaller(sinfo, visibleIP, asymmetric)
+                else:
+                    stream.setCalled(sinfo, visibleIP, asymmetric)
+                #self.mediaStreams[i] = stream
+                # Mmmm, python doesn't seem to like random insertions
+                # into lists. append should always do what I want...
+                self.mediaStreams.append(stream)
         
-    def fillInCalledParty(self, request):
-        if not self.complete:
-            visibleIP = request.signalingIP
-            self.userAgents[1] = request.userAgent.decode('quopri')
-            self.encodedAgents[1] = request.userAgent
-            self.calledInfo = request.info
-            asymmetric = request.info.flags.asymmetric
-            streams = [s.split(':') for s in request.streams.split(',')]
-            for i in range(min(len(streams), len(self.mediaStreams))):
-                # Should we also re-set the mediatypes here? can the callee change them?
-                self.mediaStreams[i].setCalled(streams[i], visibleIP, asymmetric)
-            self.conversationStart = round(time.time())
-            self.complete = 1
-        if self.needStreamUpdate:
-            visibleIP = request.signalingIP
-            asymmetric = self.calledInfo.flags.asymmetric
-            streams = [s.split(':') for s in request.streams.split(',')]
-            streamCount = len(self.mediaStreams)
-            newCount = len(streams)
-            for i in range(self.oldCount, min(newCount, streamCount)):
-                # Should we also re-set the mediatypes here? can the callee change them?
-                self.mediaStreams[i].setCalled(streams[i], visibleIP, asymmetric)
-            del(self.oldCount)
-            self.needStreamUpdate = 0
-
     def close(self):
         for stream in self.mediaStreams:
             stream.close()
_______________________________________________
Users mailing list
[email protected]
http://openser.org/cgi-bin/mailman/listinfo/users

Reply via email to