Hi,
I was having a few issues with mediaproxy when it came to re-invites and
call setup where the SDP does not get sent with the first INVITE. I
noticed this was due a couple couple of assumptions in mediaproxy which
don't always hold:
1) That SDP is always set up first by the caller (i.e. in the INVITE)
then updated by the called party (i.e. in the 200). This is not the
case. The first SDP can be sent in the 200 response by the called party
and the caller send their SDP in the ACK.
2) That RTP streams don't change once set up. This is wrong for reINVITEs.
This would cause problems when a user agent sent a reINVITE to redirect
RTP to on hold music for example - the RTP change would be ignored. I
think this had an impact on some fax setups as well, but I haven't
tested it. I found a patch to fix this by storing the streams in a hash
by media type, but that is a limiting assumption also (maybe I want 2
audi streams?).
The attached patch fixes this by changing the following:
1) This pathc allows a mediaproxy RTPSession to be set up by either the
caller or called party. A 'caller' argument is passed to the RTPSession
functions telling it if the caller or called party is doing the setup or
update.
2) The official mediaproxy does not even look at SDP details for streams
which are already set up. This patch lets every SDP packet update all of
the RTP streams for that party (either caller or called).
If you have issues with one-way-audio with media proxy after reINVITEs,
have calls the first SDP from the called party or are just plain
adventurous, please give it a go. It has been used in a production
environment for a bit over a month now. The patch was made against
mediaproxy 1.7.2, but will work against 1.8.0 as well since none of the
rtphander.py code changed.
Regards,
Jeff
diff -ur mediaproxy-1.7.2/modules/rtphandler.py mediaproxy-1.7.2-jeffw3/modules/rtphandler.py
--- mediaproxy-1.7.2/modules/rtphandler.py 2006-04-10 01:38:42.000000000 +0800
+++ mediaproxy-1.7.2-jeffw3/modules/rtphandler.py 2007-02-08 22:14:40.000000000 +0900
@@ -502,15 +502,18 @@
return "%s %s/%s/%s" % ((len(Sessions),) + tuple(Traffic))
elif cmd == 'version':
return release
- elif cmd == 'request':
- sessionId = request.sessionId
+ elif cmd == 'request' or cmd == 'lookup':
+ # callers are request, called are lookup
+ caller = (cmd == 'request')
+
try:
- session = Sessions[sessionId]
+ session = Sessions[request.sessionId]
except KeyError:
- if request.info.totag:
- ## Don't create a new session on re-INVITEs
- ## (we may decided not to use mediaproxy on the 1st INVITE)
- return None
+ #if request.info.totag:
+ # ## Don't create a new session on re-INVITEs
+ # ## (we may decided not to use mediaproxy on the 1st INVITE)
+ # print "request.info.totag"
+ # return None
## Create a brand new session
# todo: embed logic of encoding/decoding address to/from string into StructureInfo -Dan
if request.info.dispatcher is not None:
@@ -528,28 +531,25 @@
else:
request.info.dispatcher = address
try:
- session = RTPSession(request)
+ session = RTPSession(request, caller)
except RTPSessionError, why:
- raise InvalidRequestError, "cannot create session for '%s': %s" % (sessionId, why)
+ raise InvalidRequestError, "cannot create session for '%s': %s" % (request.sessionId, why)
else:
- session.updateStreams(request)
+ session.updateStreams(request, caller)
+
+ # Mmmmm, I'm a bit dubious about this section...
+ # I think the on hold status show be per RTP stream
ips = [address.split(':')[0] for address in request.streams.split(',')]
## A session can be put on hold only after both parties have sent their
## contact data. However they don't have to sign in on RTP before
## they can put the call on hold (they may have silence detection)
- if session.complete and '0.0.0.0' in ips:
+ if session.callerComplete and session.calledComplete and '0.0.0.0' in ips:
session.onHold = True
session.timeout = ProxyConfig.holdTimeout
else:
session.onHold = False
session.timeout = ProxyConfig.idleTimeout
- return session.endpointAddresses()
- elif cmd == 'lookup':
- try:
- session = Sessions[request.sessionId]
- except KeyError:
- return None
- session.fillInCalledParty(request)
+
# remove (or make it show in debug mode only)
#for stream in session.mediaStreams:
#stream.rtpStream.caller.show('rtp', compact=1)
@@ -558,6 +558,7 @@
#stream.rtcpStream.called.show('rtcp', compact=1)
# until here
return session.endpointAddresses()
+
elif cmd == 'delete':
try:
session = Sessions[request.sessionId]
@@ -629,6 +630,13 @@
self.__v.updateContacts()
self.name = name
+ def update(self, contactIP=None, rtpPort=0, visibleIP=None, local=0, asymmetric=0):
+ self.contactIP = contactIP
+ self.rtpPort = rtpPort
+ self.visibleIP = visibleIP
+ self.local = local
+ self.asymmetric = asymmetric
+
def getci(self): return self.__v.contactIP
def getvi(self): return self.__v.visibleIP
def getei(self): return self.__v.expectedIP
@@ -753,6 +761,7 @@
bytes = len(data) + 28 ## 28 is from IP+UDP headers
caller = self.caller
called = self.called
+
if address == caller.signedInAddress:
sender = 0
destination = called.address
@@ -791,6 +800,14 @@
self.signIn(called, address, data)
sender = 1
destination = caller.address
+ elif caller.signedInAddress and address[0]==caller.visibleIP and address[1]==caller.rtpPort:
+ self.signIn(caller, address, data)
+ sender = 0
+ destination = called.address
+ elif called.signedInAddress and address[0]==called.visibleIP and address[1]==called.rtpPort:
+ self.signIn(called, address, data)
+ sender = 1
+ destination = caller.address
else:
## Allow called party to change address (for example after a 183 from
## PSTN, if the call is not answered it is diverted to an announcement
@@ -875,10 +892,14 @@
self.rtcpStream.close()
def setCaller(self, address, visibleIP, asymmetric):
- if not self.rtpStream.caller:
- ip = address[0]
- port = int(address[1] or 0)
- local = self.session.localCaller
+ ip = address[0]
+ port = int(address[1] or 0)
+ local = self.session.localCaller
+ if self.rtpStream.caller:
+ # update the RTPPeer
+ self.rtpStream.caller.update(ip, port, visibleIP, local, asymmetric)
+ self.rtcpStream.caller.update(ip, port+1, visibleIP, local, asymmetric)
+ else:
rtpStream = self.rtpStream
rtcpStream = self.rtcpStream
rtpStream.caller = RTPPeer('caller', ip, port, visibleIP, local, asymmetric)
@@ -886,10 +907,14 @@
self.complete = rtpStream.called is not None
def setCalled(self, address, visibleIP, asymmetric):
- if not self.rtpStream.called:
- ip = address[0]
- port = int(address[1] or 0)
- local = self.session.localCalled
+ ip = address[0]
+ port = int(address[1] or 0)
+ local = self.session.localCalled
+ if self.rtpStream.called:
+ # update the RTPPeer
+ self.rtpStream.called.update(ip, port, visibleIP, local, asymmetric)
+ self.rtcpStream.called.update(ip, port+1, visibleIP, local, asymmetric)
+ else:
rtpStream = self.rtpStream
rtcpStream = self.rtcpStream
rtpStream.called = RTPPeer('called', ip, port, visibleIP, local, asymmetric)
@@ -951,52 +976,41 @@
class RTPSession(object):
- def __init__(self, request):
+ # Either the caller or called can initialise the RTPSession first
+ # The order can go:
+ # INVITE + SDP -> 200 + SDP -> ACK (Most common, caller initialises RTPSession)
+ # INVITE -> 200 + SDP -> ACK + SDP (Still valid, called initialises RTPSession)
+ def __init__(self, request, caller):
global Sessions
+ # These can be initialise by either end of the call
self._id = request.sessionId
self.callerDomain = request.callerDomain
self.calledDomain = request.calledDomain
self.localCaller = request.localCaller
self.localCalled = request.localCalled
- self.userAgents = [request.userAgent.decode('quopri'), 'Unknown']
- self.encodedAgents = [request.userAgent, 'Unknown']
+ self.startTime = round(time.time())
+ self.conversationStart = None
+ self.timeout = ProxyConfig.idleTimeout
+ # onHold should probably be a per mediaStream setting...
+ self.onHold = False
+ self.forceClosed = False
+ self.callerComplete = False
+ self.calledComplete = False
+ self.ended = False
+ # These get initialise in updateStreams
+ self.userAgents = ['Unknown', 'Unknown']
+ self.encodedAgents = ['Unknown', 'Unknown']
self.dispatcher = request.info.dispatcher
self.callerInfo = request.info
## We need this info until the called party signs in, else if a CANCEL
## from the caller comes before the OK it will raise an exception because
## there is no calledInfo to compute the stats when the session ends
self.calledInfo = request.info ## eventually make a copy of it
- visibleIP = request.signalingIP
- asymmetric = request.info.flags.asymmetric
- streams = [s.split(':') for s in request.streams.split(',')]
self.mediaStreams = []
- for sinfo in streams:
- try:
- mediatype = sinfo[2]
- except IndexError:
- mediatype = 'Audio'
- # remove at a later time
- print("warning: SER is using an old mediaproxy.so module. "
- "Please upgrade it to get correct media type information.")
- try:
- stream = MediaStream(self, mediatype)
- except MediaStreamError:
- for stream in self.mediaStreams:
- stream.close()
- raise RTPSessionError, "cannot find enough free ports for the media streams"
- stream.setCaller(sinfo, visibleIP, asymmetric)
- self.mediaStreams.append(stream)
+ self.updateStreams(request, caller)
if not self.mediaStreams:
raise RTPSessionError, "there are no media streams to process"
- self.startTime = round(time.time())
- self.conversationStart = None
- self.timeout = ProxyConfig.idleTimeout
- self.onHold = False
- self.forceClosed = False
- self.complete = 0 ## Will need the destination filled in as well
- self.needStreamUpdate = 0 ## If new streams are added on the fly
- self.ended = False
Sessions[self._id] = self
adr = self.endpointAddresses().split()
@@ -1066,58 +1080,64 @@
del(getit, getdt, getco, getty, getta, getdu)
del(getsu, getby, getpa, getcd, getsl, getin)
- def updateStreams(self, request):
+ def updateStreams(self, request, caller):
streams = [s.split(':') for s in request.streams.split(',')]
- streamCount = len(self.mediaStreams)
- if streamCount >= len(streams):
- return
visibleIP = request.signalingIP
- asymmetric = self.callerInfo.flags.asymmetric
- for sinfo in streams[streamCount:]:
+ asymmetric = None
+
+ # initialise the caller or called specific settings
+ if caller:
+ asymmetric = self.callerInfo.flags.asymmetric
+ if not self.callerComplete:
+ self.userAgents[0] = request.userAgent.decode('quopri')
+ self.encodedAgents[0] = request.userAgent
+ self.callerInfo = request.info
+ self.conversationStart = round(time.time())
+ self.callerComplete = True
+ else:
+ asymmetric = self.calledInfo.flags.asymmetric
+ if not self.calledComplete:
+ self.userAgents[1] = request.userAgent.decode('quopri')
+ self.encodedAgents[1] = request.userAgent
+ self.calledInfo = request.info
+ self.conversationStart = round(time.time())
+ self.calledComplete = True
+
+ # initialise the mediaStreams
+ for i in range(len(streams)):
+ sinfo = streams[i]
try:
- mediatype = sinfo[2]
+ stream = self.mediaStreams[i]
+ # update the stream
+ if caller:
+ stream.setCaller(sinfo, visibleIP, asymmetric)
+ else:
+ stream.setCalled(sinfo, visibleIP, asymmetric)
except IndexError:
- mediatype = 'Audio'
- # remove at a later time
- print("warning: SER is using an old mediaproxy.so module. "
- "Please upgrade it to get correct media type information.")
- try:
- stream = MediaStream(self, mediatype)
- except MediaStreamError:
- # check this. also at session creation
- #for stream in self.mediaStreams:
- #stream.close()
- raise RTPSessionError, "cannot find enough free ports for the media streams"
- stream.setCaller(sinfo, visibleIP, asymmetric)
- self.mediaStreams.append(stream)
- self.oldCount = streamCount
- self.needStreamUpdate = 1
+ # add a new stream
+ try:
+ mediatype = sinfo[2]
+ except IndexError:
+ mediatype = 'Audio'
+ # remove at a later time
+ print("warning: SER is using an old mediaproxy.so module. "
+ "Please upgrade it to get correct media type information.")
+ try:
+ stream = MediaStream(self, mediatype)
+ except MediaStreamError:
+ # check this. also at session creation
+ #for stream in self.mediaStreams:
+ #stream.close()
+ raise RTPSessionError, "cannot find enough free ports for the media streams"
+ if caller:
+ stream.setCaller(sinfo, visibleIP, asymmetric)
+ else:
+ stream.setCalled(sinfo, visibleIP, asymmetric)
+ #self.mediaStreams[i] = stream
+ # Mmmm, python doesn't seem to like random insertions
+ # into lists. append should always do what I want...
+ self.mediaStreams.append(stream)
- def fillInCalledParty(self, request):
- if not self.complete:
- visibleIP = request.signalingIP
- self.userAgents[1] = request.userAgent.decode('quopri')
- self.encodedAgents[1] = request.userAgent
- self.calledInfo = request.info
- asymmetric = request.info.flags.asymmetric
- streams = [s.split(':') for s in request.streams.split(',')]
- for i in range(min(len(streams), len(self.mediaStreams))):
- # Should we also re-set the mediatypes here? can the callee change them?
- self.mediaStreams[i].setCalled(streams[i], visibleIP, asymmetric)
- self.conversationStart = round(time.time())
- self.complete = 1
- if self.needStreamUpdate:
- visibleIP = request.signalingIP
- asymmetric = self.calledInfo.flags.asymmetric
- streams = [s.split(':') for s in request.streams.split(',')]
- streamCount = len(self.mediaStreams)
- newCount = len(streams)
- for i in range(self.oldCount, min(newCount, streamCount)):
- # Should we also re-set the mediatypes here? can the callee change them?
- self.mediaStreams[i].setCalled(streams[i], visibleIP, asymmetric)
- del(self.oldCount)
- self.needStreamUpdate = 0
-
def close(self):
for stream in self.mediaStreams:
stream.close()
_______________________________________________
Users mailing list
[email protected]
http://openser.org/cgi-bin/mailman/listinfo/users