So I did the "if on darwin use ridiculous playout buffer depth" hack, as described in my previous mail.

The result, appended here, is okay at playout on both Mac and Linux. However, the result here *sends* RTP packets at a bad rate on Mac.

That is, it sends 20-ms PCMU packets at a rate of 40 of them per second. Bad.

I did some experimentation, and SVN HEAD of Shtoom currently sends a proper rate -- 50 packets per second -- but sometimes with short contents -- less than 160 bytes of mulaw-encoded audio in each packet.

With SVN HEAD of Shtoom on Mac, sending to SVN HEAD+playout.patch on Linux, I get high latency in long call.

With SVN HEAD+playout.patch on Mac and on Linux, I get choppy, ugly audio as the playout on Linux underruns frequently since it isn't receiving enough packets. Note that the playout patch includes putting a small buffer in the media encoder to buffer enough bytes for a standard media frame, so it guarantees no funny-sized PCMU packets.

My proprietary branch of Shtoom has some refactored Mac audio work, and it sends PCMU packets at a good rate and with 160 bytes per packet. With that sending from Mac, the Linux playout sounds good!

So: you can apply this patch if it works for you. I will, tomorrow, work up a patch that contributes our Mac audio refactoring (largely due to Donovan Preston).

Regards,

Zooko

Index: shtoom/scripts/shreadder.py
===================================================================
--- shtoom/scripts/shreadder.py (revision 1263)
+++ shtoom/scripts/shreadder.py (working copy)
@@ -1,11 +1,13 @@
 #!/usr/bin/env python
 
 
-import struct, math, sys
+import math, random, struct, sys
 sys.path.append(sys.path.pop(0))
 import shtoom.audio
 from shtoom.rtp import formats
 
+from shtoom.rtp.packets import RTPPacket
+
 app = None
 
 class Recorder:
@@ -13,6 +15,7 @@
         self._dev = dev
         self._play = play
         self._outfp = outfp
+        self.seq = random.randrange(0, 2**48)
         import sys
         if '-q' in sys.argv:
             self.quiet = True
@@ -36,15 +39,17 @@
 
     def sample(self, *args):
         try:
-            packet = self._dev.read()
+            sample = self._dev.read()
         except IOError:
             return
-        if not packet:
+        if not sample:
             print "no audio, skipping"
             return
         if self._outfp:
-            self._outfp.write(packet.data)
+            self._outfp.write(sample.data)
         if self._play:
+            packet = RTPPacket(0, self.seq, 0, data=sample.data, ct=sample.ct)
+            self.seq = (self.seq + 1) % 2**48
             self._dev.write(packet)
         #if len(packet.data) != 320:
         #    print "discarding bad length (%d) packet"%(len(packet.data))
@@ -60,6 +65,7 @@
     import sys
 
     dev = getAudioDevice()
+    print "hellloooo, dev: %s" % `dev`
     dev.close()
     dev.reopen()
     if len(sys.argv) > 1:
Index: shtoom/setup.py
===================================================================
--- shtoom/setup.py     (revision 1263)
+++ shtoom/setup.py     (working copy)
@@ -78,6 +78,7 @@
                 'shtoom.audio', 'shtoom.app', 'shtoom.doug', 'shtoom.compat' ],
     scripts = ['scripts/shtoomphone.py', 'scripts/shtam.py',
                'scripts/shmessage.py', 'scripts/shecho.py',
+              'scripts/shreadder.py'
               ],
     classifiers = [
        'Development Status :: 3 - Alpha',
Index: shtoom/shtoom/test/test_dtmf.py
===================================================================
--- shtoom/shtoom/test/test_dtmf.py     (revision 1263)
+++ shtoom/shtoom/test/test_dtmf.py     (working copy)
@@ -63,12 +63,12 @@
         for k in dtmf.dtmf2freq.keys():
             s = dtmf.dtmfGenerator(k, 320)
             s1, s2 = s[:320], s[320:]
-            codec.decode(codec.encode(s1))
-            codec.decode(codec.encode(s2))
-            s1 = codec.decode(codec.encode(s1))
-            s2 = codec.decode(codec.encode(s2))
+            codec.decode(codec.buffer_and_encode(s1))
+            codec.decode(codec.buffer_and_encode(s2))
+            s1 = codec.decode(codec.buffer_and_encode(s1))
+            s2 = codec.decode(codec.buffer_and_encode(s2))
             s = s1+s2
             digit = detect.detect(s)
             self.assertEquals(k, digit)
             silence = '\0'*320
-            codec.decode(codec.encode(silence))
+            codec.decode(codec.buffer_and_encode(silence))
Index: shtoom/shtoom/test/test_audiodev.py
===================================================================
--- shtoom/shtoom/test/test_audiodev.py (revision 1263)
+++ shtoom/shtoom/test/test_audiodev.py (working copy)
@@ -27,7 +27,6 @@
 
     def test_create(self):
         from shtoom.audio import getAudioDevice
-        from shtoom.audio.playout import _Playout
         from twisted.internet.task import LoopingCall
         ae = self.assertEquals
         a_ = self.assert_
@@ -40,15 +39,10 @@
         m = getAudioDevice(dummymod)
 
         ae(m.getDevice(), dev)
-        a_(m.audioLC is None)
         a_(m.playout is None)
         m.close()
-        a_(m.audioLC is None)
         a_(m.playout is None)
         m.reopen()
-        a_(isinstance(m.playout, _Playout))
-        a_(isinstance(m.audioLC, LoopingCall))
         m.close()
-        a_(m.audioLC is None)
         a_(m.playout is None)
-        ae(dev.ops, ['openDev', 'close', 'reopen', 'write', 'close'])
+        ae(dev.ops, ['openDev', 'close', 'reopen', 'close'])
Index: shtoom/shtoom/test/test_codecs.py
===================================================================
--- shtoom/shtoom/test/test_codecs.py   (revision 1263)
+++ shtoom/shtoom/test/test_codecs.py   (working copy)
@@ -42,7 +42,7 @@
         ae = self.assertEquals
         ar = self.assertRaises
         n = NullCodec()
-        ae(n.encode('frobozulate'), None)
+        ae(n._encode('frobozulate'), None)
         ae(n.decode('frobozulate'), None)
         c = Codecker()
         ar(ValueError, c.setDefaultFormat, PT_CN)
@@ -54,12 +54,12 @@
         ae(c.getDefaultFormat(), PT_RAW)
         p = PassthruCodec()
         ae = self.assertEquals
-        ae(p.encode('frobozulate'), 'frobozulate')
+        ae(p.buffer_and_encode('frobozulate'), 'frobozulate')
         ae(p.decode('frobozulate'), 'frobozulate')
         p = RTPPacket(0, 0, 0, 'farnarkling', ct=PT_RAW)
         ae(c.decode(p), 'farnarkling')
-        ae(c.encode('farnarkling').data, 'farnarkling')
-        ae(c.encode('farnarkling').header.pt, PT_RAW.pt)
+        ae(c.buffer_and_encode('farnarkling').data, 'farnarkling')
+        ae(c.buffer_and_encode('farnarkling').ct, PT_RAW)
 
     # XXX testing other codecs - endianness issues? crap.
 
@@ -70,9 +70,9 @@
         c = Codecker()
         c.setDefaultFormat(PT_PCMU)
         ae(c.getDefaultFormat(), PT_PCMU)
-        ae(len(c.encode(instr).data), 160)
-        ae(c.encode(instr).data, ulawout)
-        ae(c.encode(instr).header.ct, PT_PCMU)
+        ae(len(c.buffer_and_encode(instr).data), 160)
+        ae(c.buffer_and_encode(instr).data, ulawout)
+        ae(c.buffer_and_encode(instr).ct, PT_PCMU)
 
     def testGSMCodec(self):
         if codecs.gsm is None:
@@ -81,24 +81,26 @@
         c = Codecker()
         c.setDefaultFormat(PT_GSM)
         ae(c.getDefaultFormat(), PT_GSM)
-        p = c.encode(instr)
-        ae(len(p.data), 33)
-        ae(p.header.ct, PT_GSM)
+        s = c.buffer_and_encode(instr)
+        ae(len(s.data), 33)
+        ae(s.ct, PT_GSM)
+        p = RTPPacket(0, 0, 0, s.data, ct=PT_GSM)
         ae(len(c.decode(p)), 320)
-        ae(c.encode('\0'*32), None)
+        ae(c.buffer_and_encode('\0'*32), None)
 
     def testSpeexCodec(self):
-        if codecs.gsm is None:
+        if codecs.speex is None:
             raise unittest.SkipTest("no speex support")
         ae = self.assertEquals
         c = Codecker()
         c.setDefaultFormat(PT_SPEEX)
         ae(c.getDefaultFormat(), PT_SPEEX)
-        p = c.encode(instr)
-        ae(len(p.data), 40)
-        ae(p.header.ct, PT_SPEEX)
+        s = c.buffer_and_encode(instr)
+        ae(len(s.data), 40)
+        ae(s.ct, PT_SPEEX)
+        p = RTPPacket(0, 0, 0, s.data, ct=PT_SPEEX)
         ae(len(c.decode(p)), 320)
-        ae(c.encode('\0'*30), None)
+        ae(c.buffer_and_encode('\0'*30), None)
 
     def testMediaLayer(self):
         ae = self.assertEquals
Index: shtoom/shtoom/test/harness.py
===================================================================
--- shtoom/shtoom/test/harness.py       (revision 1263)
+++ shtoom/shtoom/test/harness.py       (working copy)
@@ -175,12 +175,12 @@
         self.actions.append('create')
         return defer.succeed(self.cookie)
 
-    def startSendingAndReceiving(self, remote):
+    def start(self, remote):
         from twisted.internet.task import LoopingCall
         self.actions.append('start')
         self.go = True
         self.echo = ''
-        self.LC = LoopingCall(self.nextpacket)
+        self.LC = LoopingCall(self.mic_event)
         self.LC.start(0.020)
 
     def stopSendingAndReceiving(self):
@@ -188,9 +188,9 @@
         self.LC.stop()
         self.echo = ''
 
-    def nextpacket(self):
+    def mic_event(self):
         from twisted.internet import reactor
-        self.echo = self.app.giveRTP(self.cookie)
+        self.echo = self.app.giveSample(self.cookie)
         if self.echo is not None:
             packet = self.echo
             reactor.callLater(0, lambda : self.app.receiveRTP(self.cookie,
Index: shtoom/shtoom/test/test_douglegs.py
===================================================================
--- shtoom/shtoom/test/test_douglegs.py (revision 1263)
+++ shtoom/shtoom/test/test_douglegs.py (working copy)
@@ -53,7 +53,7 @@
         l.setCookie('foo')
         ae(l.getCookie(), 'foo')
         # We should be connected to silence at this point
-        ae(l.leg_giveRTP(), None)
+        ae(l.leg_giveSample(), None)
 
     def test_legCallSetup(self):
         from shtoom.sip import Dialog
Index: shtoom/shtoom/test/test_callcontrol.py
===================================================================
--- shtoom/shtoom/test/test_callcontrol.py      (revision 1263)
+++ shtoom/shtoom/test/test_callcontrol.py      (working copy)
@@ -209,7 +209,7 @@
         self.actions.append('create')
         return defer.succeed(self.cookie)
 
-    def startSendingAndReceiving(self, remote):
+    def start(self, remote):
         self.actions.append('start')
         pass
 
Index: shtoom/shtoom/app/phone.py
===================================================================
--- shtoom/shtoom/app/phone.py  (revision 1263)
+++ shtoom/shtoom/app/phone.py  (working copy)
@@ -145,7 +145,7 @@
         if not self._currentCall:
             self._audio.reopen()
         log.msg("call Start %r %r"%(callcookie, remoteAddr))
-        self._rtp[callcookie].startSendingAndReceiving(remoteAddr)
+        self._rtp[callcookie].start(remoteAddr)
         self._currentCall = callcookie
         cb(callcookie)
 
@@ -193,15 +193,11 @@
         except IOError:
             pass
 
-    def giveRTP(self, callcookie):
+    def giveSample(self, callcookie):
         # Check that callcookie is the active call
         if self._currentCall != callcookie or self._muted:
             return None # comfort noise
-        packet = self._audio.read()
-        if packet is None:
-            return None
-        else:
-            return packet
+        return self._audio.read()
 
     def placeCall(self, sipURL):
         return self.sip.placeCall(sipURL)
Index: shtoom/shtoom/app/base.py
===================================================================
--- shtoom/shtoom/app/base.py   (revision 1263)
+++ shtoom/shtoom/app/base.py   (working copy)
@@ -99,7 +99,7 @@
     def receiveRTP(self, callcookie, payloadType, payloadData):
         raise NotImplementedError
 
-    def giveRTP(self, callcookie):
+    def giveSample(self, callcookie):
         raise NotImplementedError
 
     def getCookie(self):
Index: shtoom/shtoom/app/doug.py
===================================================================
--- shtoom/shtoom/app/doug.py   (revision 1263)
+++ shtoom/shtoom/app/doug.py   (working copy)
@@ -156,7 +156,7 @@
         md = remoteSDP.getMediaDescription('audio')
         ipaddr = md.ipaddr or remoteSDP.ipaddr
         remoteAddr = (ipaddr, md.port)
-        self._rtp[callcookie].startSendingAndReceiving(remoteAddr)
+        self._rtp[callcookie].start(remoteAddr)
         call = self._calls[callcookie]
         if call.dialog.getDirection() == "inbound":
             self._voiceapps[callcookie].va_callanswered()
@@ -192,10 +192,9 @@
         except IOError:
             pass
 
-    def giveRTP(self, callcookie):
+    def giveSample(self, callcookie):
         v = self._voiceapps[callcookie]
-        packet = v.va_giveRTP(callcookie)
-        return packet
+        return v.va_giveSample(callcookie)
 
     def placeCall(self, cookie, nleg, sipURL, fromURI=None):
         ncookie = self.getCookie()
Index: shtoom/shtoom/app/interfaces.py
===================================================================
--- shtoom/shtoom/app/interfaces.py     (revision 1263)
+++ shtoom/shtoom/app/interfaces.py     (working copy)
@@ -38,9 +38,8 @@
             packet, or something else.
         """
 
-    def giveRTP(self, callcookie):
-        """ The network layer wants an RTP packet to send. Return a 2-tuple
-            of (payloadType, payloadData)
+    def giveSample(self, callcookie):
+        """ The network layer wants to send an RTP packet. Return an instance 
of MediaSample or None.
         """
 
 class ApplicationUIInterface(Interface):
Index: shtoom/shtoom/doug/voiceapp.py
===================================================================
--- shtoom/shtoom/doug/voiceapp.py      (revision 1263)
+++ shtoom/shtoom/doug/voiceapp.py      (working copy)
@@ -52,8 +52,8 @@
     def va_selectDefaultFormat(self, ptlist, callcookie):
         return self.getLeg(callcookie).selectDefaultFormat(ptlist)
 
-    def va_giveRTP(self, callcookie):
-        return self.getLeg(callcookie).leg_giveRTP()
+    def va_giveSample(self, callcookie):
+        return self.getLeg(callcookie).leg_giveSample()
 
     def va_receiveRTP(self, packet, callcookie):
         return self.getLeg(callcookie).leg_receiveRTP(packet)
Index: shtoom/shtoom/doug/leg.py
===================================================================
--- shtoom/shtoom/doug/leg.py   (revision 1263)
+++ shtoom/shtoom/doug/leg.py   (working copy)
@@ -182,12 +182,8 @@
         if dtmf == self.__currentDTMFKey:
             self.__currentDTMFKey = None
 
-    def leg_giveRTP(self):
-        data = self.__connected.read()
-        if data:
-            packet = self.__converter.convertOutbound(data)
-            return packet
-        return None # comfort noise
+    def leg_giveSample(self):
+        return self.__converter.convertOutbound(self.__connected.read())
 
     def leg_receiveRTP(self, packet):
         data = self.__converter.convertInbound(packet)
Index: shtoom/shtoom/audio/converters.py
===================================================================
--- shtoom/shtoom/audio/converters.py   (revision 1263)
+++ shtoom/shtoom/audio/converters.py   (working copy)
@@ -2,9 +2,8 @@
 from shtoom.rtp.formats import PT_PCMU, PT_GSM, PT_SPEEX, PT_DVI4, PT_RAW
 from shtoom.rtp.formats import PT_PCMA, PT_ILBC
 from shtoom.rtp.formats import PT_CN, PT_xCN, AudioPTMarker
-from shtoom.rtp.packets import RTPPacket
 from shtoom.avail import codecs
-from shtoom.audio.playout import Playout
+from shtoom.audio import playout
 from twisted.python import log
 import struct
 from shtoom.lwc import Interface, implements
@@ -14,7 +13,14 @@
 except ImportError:
     audioop = None
 
+class MediaSample:
+    def __init__(self, ct, data):
+        self.ct = ct
+        self.data = data
 
+    def __repr__(self):
+        return "<%s/%s, %s>" % (self.__class__.__name__, self.ct, `self.data`,)
+
 class NullConv:
     # Should be refactored away
     def __init__(self, device):
@@ -60,17 +66,27 @@
         raise ValueError("insane endian-check result %r"%(p))
 
 class IAudioCodec:
-    def encode(bytes):
+    def buffer_and_encode(self, bytes):
         "encode bytes, a string of audio"
-    def decode(bytes):
+    def decode(self, bytes):
         "decode bytes, a string of audio"
 
 class _Codec:
     "Base class for codecs"
     implements(IAudioCodec)
+    def __init__(self, samplesize):
+        self.samplesize = samplesize
+        self.b = ''
 
+    def buffer_and_encode(self, bytes):
+        self.b += bytes
+        if len(self.b) >= self.samplesize:
+            sample, self.b = self.b[:self.samplesize], self.b[self.samplesize:]
+            return self._encode(sample)
+
 class GSMCodec(_Codec):
     def __init__(self):
+        _Codec.__init__(self, 320)
         if isLittleEndian():
             self.enc = codecs.gsm.gsm(codecs.gsm.LITTLE)
             self.dec = codecs.gsm.gsm(codecs.gsm.LITTLE)
@@ -78,11 +94,7 @@
             self.enc = codecs.gsm.gsm(codecs.gsm.BIG)
             self.dec = codecs.gsm.gsm(codecs.gsm.BIG)
 
-    def encode(self, bytes):
-        if len(bytes) != 320:
-            log.msg("GSM: short read on encode, %d != 320"%len(bytes), 
-                                                            system="codec")
-            return None
+    def _encode(self, bytes):
         return self.enc.encode(bytes)
 
     def decode(self, bytes):
@@ -98,12 +110,9 @@
     def __init__(self):
         self.enc = codecs.speex.new(8)
         self.dec = codecs.speex.new(8)
+        _Codec.__init__(self, 320)
 
-    def encode(self, bytes, unpack=struct.unpack):
-        if len(bytes) != 320:
-            log.msg("speex: short read on encode, %d != 320"%len(bytes), 
-                                                            system="codec")
-            return None
+    def _encode(self, bytes, unpack=struct.unpack):
         frames = list(unpack('160h', bytes))
         return self.enc.encode(frames)
 
@@ -119,10 +128,10 @@
 class MulawCodec(_Codec):
     "A codec for mulaw encoded audio (e.g. G.711U)"
 
-    def encode(self, bytes):
-        if len(bytes) != 320:
-            log.msg("mulaw: short read on encode, %d != 320"%len(bytes), 
-                                                            system="codec")
+    def __init__(self):
+        _Codec.__init__(self, 320)
+
+    def _encode(self, bytes):
         return audioop.lin2ulaw(bytes, 2)
 
     def decode(self, bytes):
@@ -134,10 +143,10 @@
 class AlawCodec(_Codec):
     "A codec for alaw encoded audio (e.g. G.711A)"
 
-    def encode(self, bytes):
-        if len(bytes) != 320:
-            log.msg("alaw: short read on encode, %d != 320"%len(bytes), 
-                                                            system="codec")
+    def __init__(self):
+        _Codec.__init__(self, 320)
+
+    def _encode(self, bytes):
         return audioop.lin2alaw(bytes, 2)
 
     def decode(self, bytes):
@@ -149,7 +158,10 @@
 class NullCodec(_Codec):
     "A codec that consumes/emits nothing (e.g. for confort noise)"
 
-    def encode(self, bytes):
+    def __init__(self):
+        _Codec.__init__(self, 1)
+
+    def _encode(self, bytes):
         return None
 
     def decode(self, bytes):
@@ -157,7 +169,9 @@
 
 class PassthruCodec(_Codec):
     "A codec that leaves it's input alone"
-    encode = decode = lambda self, bytes: bytes
+    def __init__(self):
+        _Codec.__init__(self, 1)
+    buffer_and_encode = decode = lambda self, bytes: bytes
 
 class Codecker:
     def __init__(self):
@@ -165,6 +179,7 @@
         self.codecs[PT_CN] = NullCodec()
         self.codecs[PT_xCN] = NullCodec()
         self.codecs[PT_RAW] = PassthruCodec()
+        assert codecs.mulaw
         if codecs.mulaw is not None:
             self.codecs[PT_PCMU] = MulawCodec()
         if codecs.alaw is not None:
@@ -192,17 +207,17 @@
         else:
             raise ValueError("Can't handle codec %r"%format)
 
-    def encode(self, bytes):
-        "Accepts audio as bytes, emits an RTPPacket"
+    def buffer_and_encode(self, bytes):
+        "Accepts audio as bytes, emits a MediaSamples or None"
         if not bytes:
             return None
         codec = self.codecs.get(self.format)
         if not codec:
             raise ValueError("can't encode format %r"%self.format)
-        encaudio = codec.encode(bytes)
+        encaudio = codec.buffer_and_encode(bytes)
         if not encaudio:
             return None
-        return RTPPacket(0, 0, 0, encaudio, ct=self.format)
+        return MediaSample(self.format, encaudio)
 
     def decode(self, packet):
         "Accepts an RTPPacket, emits audio as bytes"
@@ -218,11 +233,10 @@
     """ The MediaLayer sits between the network and the raw
         audio device. It converts the audio to/from the codec on
         the network to the format used by the lower-level audio
-        devices (16 bit signed ints at 8KHz).
+        devices (16 bit signed ints at an integer multiple of 8KHz).
     """
     def __init__(self, device, defaultFormat=PT_PCMU, *args, **kwargs):
         self.playout = None
-        self.audioLC = None
         self.codecker = Codecker()
         self.codecker.setDefaultFormat(defaultFormat)
         NullConv.__init__(self, device, *args, **kwargs)
@@ -242,37 +256,27 @@
 
     def read(self):
         bytes = self._d.read()
-        return self.codecker.encode(bytes)
+        return self.codecker.buffer_and_encode(bytes)
 
     def write(self, packet):
         if self.playout is None:
             log.msg("write before reopen, discarding")
             return 0
         audio = self.codecker.decode(packet)
-        if not audio:
-            self.playout.write('', packet)
+        if audio:
+            return self.playout.write(audio, packet.header.seq)
+        else:
+            self.playout.write('', packet.header.seq)
             return 0
-        return self.playout.write(audio, packet)
 
     def reopen(self):
-        from twisted.internet.task import LoopingCall
         if self.playout is not None:
             log.msg("duplicate ACK? playout already started")
             return
         NullConv.reopen(self)
-        self.playout = Playout()
-        log.msg("initialising playout %r"%(self.playout,))
-        self.audioLC = LoopingCall(self.playoutAudio)
-        self.audioLC.start(0.020)
+        self.playout = playout.Playout(self)
 
-    def playoutAudio(self):
-        au = self.playout.read()
-        self._d.write(au)
-
     def close(self):
-        if self.audioLC is not None:
-            self.audioLC.stop()
-            self.audioLC = None
         self.playout = None
         NullConv.close(self)
 
@@ -282,7 +286,7 @@
     def __init__(self, defaultFormat=PT_PCMU, *args, **kwargs):
         self.codecker = Codecker()
         self.codecker.setDefaultFormat(defaultFormat)
-        self.convertOutbound = self.codecker.encode
+        self.convertOutbound = self.codecker.buffer_and_encode
         self.convertInbound = self.codecker.decode
         if not kwargs.get('device'):
             kwargs['device'] = None
Index: shtoom/shtoom/audio/playout.py
===================================================================
--- shtoom/shtoom/audio/playout.py      (revision 1263)
+++ shtoom/shtoom/audio/playout.py      (working copy)
@@ -1,112 +1,159 @@
+# from the Python Standard Library
+from bisect import bisect
+import sys
+
+# from the Twisted library
+from twisted.internet import reactor
 from twisted.python import log
 
-DEBUG=False
+# TODO's
+#  * think about persistent clock skew (the RTP book by Perkins has an 
extensive discussion and sophisticated algorithm for this)
+#  * measure (callLater lag, jitter, clock skew,)
+#  * add more livetests
+#  * minimize playout buffer.  The only reason for the large size of 
PLAYOUT_BUFFER -- currently 0.8 seconds (!!!) is because we didn't have a 
precise way to get our Python called *just* before the audio output FIFO 
underran, on Mac.  Such a precise way has been added on Mac thanks to Bob 
Ippolito.  The thing to do on Mac is use the callback Bob provided, which means 
"the audio output buffer is on the verge of running dry" to move the next 
packet's worth of audio from jitter buffer to output device FIFO.  That is: 
call "consider_playing_out_sample()" from the audio device's "I'm about to run 
dry" callback, instead of from a reactor.callLater().  Hopefully similar things 
could be done on Linux and w32 as well.  This *might* trigger a cleanup of this 
code, because the event that comes from the audio device saying "okay I've just 
about finished playing those 20 ms you gave me" and the event that comes from 
"hey a network packet arrived with some audio data in it" could be probably be 
completely separate from each other and each could probably be simpler...
 
-class _Playout:
-    "Base class for playout. should be an interface - later"
+if 'darwin' in sys.platform.lower():
+    PLAYOUT_BUFFER_SECONDS=0.8 # stuff up to this many seconds worth of 
packets into the audio output buffer
+else:
+    PLAYOUT_BUFFER_SECONDS=0.03 # stuff up to this many seconds worth of 
packets into the audio output buffer
+JITTER_BUFFER_SECONDS=0.8 # store up to this many seconds worth of packets in 
the jitter buffer before switching to playout mode
+CATCHUP_TRIGGER_SECONDS=1.4 # if we have this many or more seconds worth of 
packets, drop the oldest ones in order to catch up
 
-class BrainDeadPlayout(_Playout):
-    # We keep two packets of audio. self.b1 is the one "to be read"
-    # while self.b2 is the pending one. No notice is taken of the
-    # RTP timestamps, sequence numbers, or, really, anything else.
+EPSILON=0.0001
 
-    def __init__(self):
-        self.b1 = ''
-        self.b2 = ''
+import time
 
-    def write(self, bytes, packet=None):
-        if not isinstance(bytes, basestring):
-            raise ValueError("playout got %s instead of bytes"%(type(bytes)))
-        if not self.b2:
-            # underrun
-            self.b2 = bytes
-            return len(bytes)
-        else:
-            # overrun! log.msg, maybe?
-            self.b1 = self.b2
-            self.b2 = bytes
-            return len(bytes)
+DEBUG=False
+# DEBUG=True
 
-    def read(self):
-        if self.b1 is not None:
-            bytes, self.b1, self.b2 = self.b1, self.b2, ''
-            return bytes
-        else:
-            return ''
+def is_run(packets, i, runsecs):
+    """
+    Returns True iff packets contains a run of sequential packets starting at 
index i and extending at least runsecs seconds in aggregate length.
+    """
+    runbytes = runsecs * 16000
+    firstseq = packets[i][0]
+    runbytessofar = 0
+    i2 = 0
+    while runbytessofar < runbytes:
+        if len(packets) <= i + i2:
+            return False
+        if packets[i + i2][0] != firstseq + i2:
+            return False
+        runbytessofar += len(packets[i + i2][1])
+        i2 += 1
+    return True
 
-# Basically it introduces an n chunks long queue (the n being adjustable
-# by changing self.backlog in the constructor). This queue is used for
-# resequencing packets in the correct order. The size of the queue can
-# decrease when no packets are received - eventually returning silence
-# sound chunks if the queue grows empty.
-# The size can also increase if too many packets are received in an
-# interval. When the queue grows too large (currently backlogsize + 2)
-# packets will be discarded. This strategy causes the number of discards
-# to exactly match the number of dummy silence packets, and works quite
-# well - assuming of course that the other RTP peer is behaving correctly
-# and not generating an incorrect number of sound packets.
-# I'd like to expand this functionality to support autoadjustment of the
-# backlog size - fairly simple to do.
+class Playout:
+    """
+    Theory of operation: you have two modes: "playout" mode and "refill" mode.
+    When you are in playout mode then you play out sequential audio packets 
from
+    your jitter buffer to the audio output device's FIFO as needed.  Do the
+    obvious right thing with out-of-order packets.
 
-class BacklogPlayout(_Playout):
-    def __init__(self):
-        self.backlog = 3
-        self.queue = ['\0'*320]*(self.backlog)
-        self.expect_seq = None
+    You switch to refill mode when you have a buffer underrun -- that is not
+    enough in-order data came in from the network so the audio output device
+    (i.e., the speaker) ran dry.  When you are in refill mode, you don't send
+    any packets to the audio output device, but instead hoard them until you
+    have JITTER_BUFFER_SECONDS worth of sequential, in-order data ready, and
+    then switch to playout mode.
+
+    There's an added complication because this code doesn't currently have a
+    nice clean way to say "write this 20 milliseconds worth of audio to the
+    output device\'s FIFO, and then run the following method *just* before 
those
+    20 milliseconds are all used up".  This complication is called the "playout
+    buffer", and it is a way to stuff way more than 20 milliseconds worth of
+    audio into the audio output device's FIFO, so that we'll get a chance to 
add
+    more packets before it underruns, even when reactor.callLater() sometimes
+    gets called 110 milliseconds later than we wanted.  This happens on Mac.
+    See TODO item about playout buffer in comments above.
+    """
+    def __init__(self, medialayer):
+        self.medialayer = medialayer
+        self.b = [] # (seqno, bytes,)
+        self.s = 0 # the sequence number of the (most recent) packet which has 
gone to the output device
+        self.drytime = None # the time at which the audio output device will 
have nothing to play
+        self.refillmode = True # we start in refill mode
+        self.nextcheckscheduled = None
+        self.st = time.time() 
+
+    def _schedule_next_check(self, delta, t=time.time):
+        if self.nextcheckscheduled:
+            return
+        self.nextcheckscheduled = reactor.callLater(delta, 
self._do_scheduled_check)
         if DEBUG:
-            self.count = 0
+            log.msg("xxxxx scheduling next check. now: %0.3f, then: %0.3f, 
drytime: %0.3f" % (t() - self.st, self.nextcheckscheduled.getTime() - self.st, 
self.drytime - self.st,))
 
-    def write(self, bytes, packet=None):
-        if not isinstance(bytes, basestring):
-            raise ValueError("playout got %s instead of bytes"%(type(bytes)))
+    def _do_scheduled_check(self, t=time.time):
+        if DEBUG:
+            log.msg("xxxxxxx doing scheduled check at %0.3f == %0.3f late" % 
(t() - self.st, t()-self.nextcheckscheduled.getTime()))
+        self.nextcheckscheduled = None
+        self._consider_playing_out_sample()
 
-        if self.expect_seq is None:
-            self.expect_seq = packet.header.seq # First packet. Initialize seq
+    def _consider_playing_out_sample(self, t=time.time, newsampseqno=None):
+        if not self.b or self.b[0][0] != (self.s + 1):
+            # We don't have a packet ready to play out.
+            if t() >= self.drytime:
+                self._switch_to_refill_mode()
+            return
 
-        backlog = len(self.queue)
-        if packet.header.seq == self.expect_seq:
-            self.expect_seq = packet.header.seq+1
-            if backlog < self.backlog+1:
-                self.queue.append(bytes)
-            elif DEBUG:
-                log.msg("BacklogPlayout discarding")
-        else:
-            offset = packet.header.seq-self.expect_seq
-            if offset > 0 and offset < 3:
-                # Fill with empty packets
-                self.queue += [None]*offset
-                self.queue.append(bytes)
-                self.expect_seq = packet.header.seq+1
-                if DEBUG:
-                    log.msg("BacklogPlayout got hole at %d"%offset)
-            elif offset < 0 and offset > -backlog:
-                if self.queue[offset] != None:
-                    if DEBUG:
-                        log.msg("BacklogPlayout discarding duplicate packet")
-                else:
-                    if DEBUG:
-                        log.msg("BacklogPlayout got out of order packets")
-                    self.queue[offset] = bytes
-            return len(bytes)
+        if self.drytime and (t() >= self.drytime) and ((not newsampseqno) or 
(newsampseqno != (self.s + 1))):
+            log.msg("xxxxxxxxx output device ran dry unnecessarily! now: 
%0.3f, self.drytime: %s, nextseq: %s, newsampseqno: %s" % (t() - self.st, 
self.drytime - self.st, self.b[0][0], newsampseqno,))
 
-    def read(self):
-        available = len(self.queue)
+        # While the output device would run dry within PLAYOUT_BUFFER_SECONDS 
from now, then play out another sample.
+        while (t() + PLAYOUT_BUFFER_SECONDS >= self.drytime) and self.b and 
self.b[0][0] == (self.s + 1):
+            (seq, bytes,) = self.b.pop(0)
+            self.medialayer._d.write(bytes)
+            self.s = seq
+            packetlen = len(bytes) / float(16000)
+            if self.drytime is None:
+                self.drytime = t() + packetlen
+            else:
+                self.drytime = max(self.drytime + packetlen, t() + packetlen)
+            if DEBUG:
+                log.msg("xxxxx %0.3f played %s, playbuflen ~= %0.3f, 
jitterbuf: %d:%s" % (t() - self.st, seq, self.drytime and (self.drytime - t()) 
or 0, len(self.b), map(lambda x: x[0], self.b),))
+
+        # If we filled the playout buffer then come back and consider 
refilling it after it has an open slot big enough to hold the next packet.  (If 
we didn't just fill it then when the next packet comes in from the network 
self.write() will invoke self._consider_playing_out_sample().)
+        if self.b and self.b[0][0] == (self.s + 1):
+            # Come back and consider playing out again after we've played out 
an amount of audio equal to the next packet.
+            self._schedule_next_check(len(self.b[0][1]) / float(16000) + 
EPSILON)
+
+
+    def _switch_to_refill_mode(self):
+        self.refillmode = True
+        self._consider_switching_to_play_mode()
+
+    def _consider_switching_to_play_mode(self):
+        # If we have enough sequential packets ready, then we'll make them be 
the current packets and switch to play mode.
+        for i in range(len(self.b) - 1):
+            if is_run(self.b, i, JITTER_BUFFER_SECONDS):
+                self.b = self.b[i:]
+                self.s = self.b[0][0] - 1 # prime it for the next packet
+                self.refillmode = False
+                self._consider_playing_out_sample()
+                return
+
+    def write(self, bytes, seq, t=time.time):
+        assert isinstance(bytes, basestring)
+
+        if not bytes:
+            return 0
+
+        i = bisect(self.b, (seq, bytes,))
+        if i > 0 and self.b[i-1][0] == seq:
+            log.msg("xxx duplicate packet %s" % seq)
+            return
+
+        self.b.insert(i, (seq, bytes,))
+
         if DEBUG:
-            self.count += 1
-            if self.count%10==0:
-                print available
+            log.msg("xxxxx %0.3f added  %s, playbuflen ~= %0.3f, jitterbuf: 
%d:%s" % (t() - self.st, seq, self.drytime and (self.drytime - t()) or 0, 
len(self.b), map(lambda x: x[0], self.b),))
+        if self.refillmode:
+            self._consider_switching_to_play_mode()
+        else:
+            self._consider_playing_out_sample(newsampseqno=seq)
+            if self.b and (self.b[0][0] == self.s + 1) and is_run(self.b, 0, 
CATCHUP_TRIGGER_SECONDS):
+                (seq, bytes,) = self.b.pop(0) # catch up
+                log.msg("xxxxxxx catchup! dropping %s" % seq)
+                self.s = self.b[0][0] - 1 # prime it for the next packet
 
-        if available:
-            data = self.queue.pop(0)
-            if not data: return '\0'*320
-            return data
-        elif DEBUG:
-            log.msg("BacklogPlayout audio underrun")
-        # Note that we should really be doing something better than silence!
-        return '\0'*320
 
-
-Playout = BrainDeadPlayout
-# BacklogPlayout is causing audio to "just die" for people a couple of
-# minutes into the call. XXX to fix
-#Playout = BacklogPlayout
Index: shtoom/shtoom/rtp/protocol.py
===================================================================
--- shtoom/shtoom/rtp/protocol.py       (revision 1263)
+++ shtoom/shtoom/rtp/protocol.py       (working copy)
@@ -3,7 +3,7 @@
 # $Id: rtp.py,v 1.40 2004/03/07 14:41:39 anthony Exp $
 #
 
-import struct, random, os, md5, socket
+import struct, sys, random, os, md5, socket
 from time import sleep, time
 
 from twisted.internet import reactor, defer
@@ -13,9 +13,11 @@
 
 from shtoom.rtp.formats import SDPGenerator, PT_CN, PT_xCN, PT_NTE
 from shtoom.rtp.packets import RTPPacket, parse_rtppacket
+from shtoom.audio.converters import MediaSample
 
 TWO_TO_THE_16TH = 2<<16
 TWO_TO_THE_32ND = 2<<32
+TWO_TO_THE_48TH = 2<<48
 
 from shtoom.rtp.packets import NTE
 
@@ -29,13 +31,21 @@
 
     _cbDone = None
 
-    rtpParser = None
-
+    Done = False
     def __init__(self, app, cookie, *args, **kwargs):
         self.app = app
         self.cookie = cookie
         self._pendingDTMF = []
         #DatagramProtocol.__init__(self, *args, **kwargs)
+        self.ptdict = {}
+        self.seq = self.genRandom(bits=16)
+        self.s = None
+        self.f = None
+        self.sr = None
+        self.fr = None
+        self.ts = self.genInitTS()
+        self.ssrc = self.genSSRC()
+        self.sending = False
 
     def getSDP(self, othersdp=None):
         sdp = SDPGenerator().getSDP(self)
@@ -232,9 +242,17 @@
         packet = RTPPacket(self.ssrc, self.seq, self.ts, data, pt=pt, 
xhdrtype=xhdrtype, xhdrdata=xhdrdata)
 
         self.seq += 1
-        self.transport.write(packet.netbytes(), self.dest)
+        # Note that seq no gets modded by 2^16 in RTPPacket, so it doesn't 
need to be wrapped at 16 bits here.
+        if self.seq >= TWO_TO_THE_48TH:
+            self.seq = self.seq - TWO_TO_THE_48TH
 
+        if hasattr(self, 'dest'):
+            self.transport.write(packet.netbytes(), self.dest)
+        else:
+            print "We attempted to send a packet before start() was called.  
This can happen because we are attempting to send a packet in response to an 
incoming packet.  The outgoing packet will be dropped, exactly as if it had 
been successfully sent and then disappeared into the void.  Our protocol is 
required to deal with this possibility anyway, since we use an unreliable 
transport.  As soon as the SIP ACK message arrives then start() will be called 
and after that attempts to send will work.  pt: %s, data: %s, xhdrtype: %s, 
xhdrdata: %s" % (pt, `data`, xhdrtype, `xhdrdata`,)
+
     def _send_cn_packet(self):
+        assert hasattr(self, 'dest'), "It is required that start() is called 
before _send_cn_packet() is called.  This requirement has not been met."
         # PT 13 is CN.
         if self.ptdict.has_key(PT_CN):
             cnpt = PT_CN.pt
@@ -248,35 +266,24 @@
 
         self._send_packet(cnpt, chr(127))
 
-    def startSendingAndReceiving(self, dest, fp=None):
+    def start(self, dest, fp=None):
         self.dest = dest
-        self.prevInTime = self.prevOutTime = time()
-        self.sendFirstData()
 
-    def sendFirstData(self):
-        self.seq = self.genRandom(bits=16)
-        self.ts = self.genInitTS()
-        self.ssrc = self.genSSRC()
-        self.sample = None
-        self.packets = 0
-        self.Done = 0
-        self.sent = 0
-        try:
-            self.sample = self.app.giveRTP(self.cookie)
-        except IOError: # stupid sound devices
-            self.sample = None
-            pass
-        self.LC = LoopingCall(self.nextpacket)
-        self.LC.start(0.020)
+        self.Done = False
+        self.sending = True
+        if hasattr(self.transport, 'connect'):
+            self.transport.connect(*self.dest)
+
         # Now send a single CN packet to seed any firewalls that might
         # need an outbound packet to let the inbound back.
         self._send_cn_packet()
-        if hasattr(self.transport, 'connect'):
-            self.transport.connect(*self.dest)
 
-    def datagramReceived(self, datagram, addr):
-        # XXX check for late STUN packets here. see, e.g datagramReceived in 
sip.py
+        self.LC = LoopingCall(self.mic_event)
+        self.LC.start(0.020)
+
+    def datagramReceived(self, datagram, addr, t=time):
         packet = parse_rtppacket(datagram)
+
         try:
             packet.header.ct = self.ptdict[packet.header.pt]
         except KeyError:
@@ -284,8 +291,22 @@
                 # Argh nonstandardness suckage
                 packet.header.pt = 13
                 packet.header.ct = self.ptdict[packet.header.pt]
-        if packet:
-            self.app.receiveRTP(self.cookie, packet)
+            else:
+                log.msg("received packet with unknown PT %s" % 
packet.header.pt)
+                # XXX This could overflow the log.  Ideally we would have a 
"previous message repeated N times" feature...  --Zooko 2004-10-18
+                return # drop the packet on the floor
+        self.app.receiveRTP(self.cookie, packet)
+        if packet.header.pt == 0:
+            if self.sr is None:
+                self.sr = t()
+                self.fr = packet.header.seq
+            if packet.header.seq % 1000 == 0:
+                now = t()
+                delt = now - self.sr
+                ps = packet.header.seq - self.fr
+                if ps < 0:
+                    ps += 2**16
+                print "received %s packets in %s secs, packets/sec: %s" % (ps, 
delt, ps/delt,)
 
     def genSSRC(self):
         # Python-ish hack at RFC1889, Appendix A.6
@@ -336,7 +357,7 @@
             hex = m.hexdigest()
         return int(hex[:bits//4],16)
 
-    def nextpacket(self, n=None, f=None, pack=struct.pack):
+    def mic_event(self, n=None, f=None, pack=struct.pack, t=time):
         if self.Done:
             if self.LC is not None:
                 self.LC.stop()
@@ -344,21 +365,33 @@
             if self._cbDone:
                 self._cbDone()
             return
-        self.ts += 160
-        self.packets += 1
         # We need to keep track of whether we were in silence mode or not -
         # when we go from silent->talking, set the marker bit. Other end
         # can use this as an excuse to adjust playout buffer.
-        if self.sample is not None:
-            pt = self.ptdict[self.sample.header.ct]
-            self._send_packet(pt, self.sample.data)
-            self.sent += 1
-            self.sample = None
-        elif (self.packets - self.sent) % 100 == 0:
-            self._send_cn_packet()
+        sample = self.app.giveSample(self.cookie)
+        while sample:
+            if self.sending:
+                pt = self.ptdict[sample.ct]
+                self._send_packet(pt, sample.data)
+                if pt == 0:
+                    if self.s is None:
+                        self.s = t()
+                        self.f = self.seq
+                    if self.seq % 1000 == 0:
+                        ps = self.seq - self.f
+                        if ps < 0:
+                            ps += 2**16
+                        now = t()
+                        delt = now - self.s
+                        print "sent %s packets in %s seconds.  packets/sec: 
%s" % (ps, delt, ps/delt,)
+                self.ts += 160
+                # Wrapping
+                if self.ts >= TWO_TO_THE_32ND:
+                    self.ts = self.ts - TWO_TO_THE_32ND
+            sample = self.app.giveSample(self.cookie)
 
         # Now send any pending DTMF keystrokes
-        if self._pendingDTMF:
+        if self.sending and self._pendingDTMF:
             payload = self._pendingDTMF[0].getPayload(self.ts)
             if payload:
                 ntept = self.ptdict.get(PT_NTE)
@@ -366,14 +399,3 @@
                     self._send_packet(pt=ntept, data=payload)
                 if self._pendingDTMF[0].isDone():
                     self._pendingDTMF = self._pendingDTMF[1:]
-        try:
-            self.sample = self.app.giveRTP(self.cookie)
-        except IOError:
-            pass
-
-        # Wrapping
-        if self.seq >= TWO_TO_THE_16TH:
-            self.seq = self.seq - TWO_TO_THE_16TH
-
-        if self.ts >= TWO_TO_THE_32ND:
-            self.ts = self.ts - TWO_TO_THE_32ND
Index: shtoom/shtoom/upnp.py
===================================================================
--- shtoom/shtoom/upnp.py       (revision 1263)
+++ shtoom/shtoom/upnp.py       (working copy)
@@ -69,10 +69,10 @@
                                         system='UPnP')
         if status == "200":
             self.gotSearchResponse = True
-            self.handleSearchResponse(message)
             if self.upnpTimeout:
                 self.upnpTimeout.cancel()
                 self.upnpTimeout = None
+            self.handleSearchResponse(message)
 
     def handleSearchResponse(self, message):
         import urlparse
Index: shtoom/shtoom/interfaces.py
===================================================================
--- shtoom/shtoom/interfaces.py (revision 1263)
+++ shtoom/shtoom/interfaces.py (working copy)
@@ -79,8 +79,8 @@
         """ Returns the IP address for this end of the RTP connection.
         """
 
-    def startSendingAndReceiving(self):
-        """ Start the timer loop that delivers and receives packets.
+    def start(self):
+        """ Start the timer loop that sends packets of audio.
         """
 
     def stopSendingAndReceiving(self):
Index: CocoaShtoom/CoreAudioDevice.py
===================================================================
--- CocoaShtoom/CoreAudioDevice.py      (revision 1263)
+++ CocoaShtoom/CoreAudioDevice.py      (working copy)
@@ -15,15 +15,18 @@
         return bytes
 
     def write(self, samples):
+        print "zzz %s.write(%s)" % (self, len(samples),)
         self.audioMonitor.queueAudioSampleData_(
             NSData.dataWithBytes_length_(samples, len(samples)))
         
     def openDev(self):
+        print "zzz %s.openDev()" % (self,)
         self.buffer = ''
         self.audioMonitor.setDeviceOpen_(True)
     reopen = openDev
 
     def close(self):
+        print "zzz %s.close()" % (self,)
         self.audioMonitor.setDeviceOpen_(False)
 
     def isClosed(self):
_______________________________________________
Shtoom mailing list
[email protected]
http://mail.python.org/mailman/listinfo/shtoom

Reply via email to