Revision: 10057
Author: oleg.kulikoff
Date: Thu Feb 18 09:31:18 2010
Log: Issue 1258: RTP stack performance improvements
Issue 1283: Restore sequence of RTP packets
Issue 1276: Jitter buffer always return one packet.
Issue 1288: Media Player
http://code.google.com/p/mobicents/source/detail?r=10057
Modified:
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/ReceiveStream.java
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpFactory.java
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpPacket.java
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpSocket.java
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/SendStream.java
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/JitterBuffer.java
Thu Feb 18 09:31:18 2010
@@ -15,6 +15,7 @@
import java.io.Serializable;
import org.apache.log4j.Logger;
+import org.mobicents.media.Buffer;
import org.mobicents.media.Format;
/**
@@ -37,17 +38,23 @@
* @author amit bhayani
*/
public class JitterBuffer implements Serializable {
-
- private int period;
private int jitter;
- private BufferConcurrentLinkedQueue<RtpPacket> queue = new
BufferConcurrentLinkedQueue();
+
+ private boolean readStarted = true;
+ private boolean writeStarted = false;
+
+ private RtpPacket[] queue = new RtpPacket[100];
+ private int readCursor;
+ private int writeCursor;
+
private volatile boolean ready = false;
private long duration;
+
private long timestamp;
private Format format;
private RtpClock clock;
- private Logger logger = Logger.getLogger(JitterBuffer.class);
+ private static Logger logger = Logger.getLogger(JitterBuffer.class);
private long drift;
private long r,s;
@@ -61,8 +68,7 @@
* @param jitter
* the size of the jitter in milliseconds.
*/
- public JitterBuffer(int jitter, int period) {
- this.period = period;
+ public JitterBuffer(int jitter) {
this.jitter = jitter;
}
@@ -83,10 +89,6 @@
public int getJitter() {
return jitter;
}
-
- public void setPeriod(int period) {
- this.period = period;
- }
public double getInterArrivalJitter() {
return j;
@@ -96,16 +98,17 @@
return jm;
}
- public void write(RtpPacket rtpPacket) {
- //calculate time
- long t = clock.getTime(rtpPacket.getTimestamp());
+ public void write(RtpPacket packet) {
+ //calculate time using absolute clock
long now = System.currentTimeMillis();
- rtpPacket.setTime(t);
+ //calculate time using rtp clock
+ long t = clock.getTime(packet.getTimestamp());
+ packet.setTime(t);
//calculating inter-arrival jitter
if (r > 0 && s > 0) {
- long D = (now - r) - (rtpPacket.getTime() - s);
+ long D = (now - r) - (packet.getTime() - s);
if (D < 0) {
D = -D;
}
@@ -115,48 +118,110 @@
}
}
- s = rtpPacket.getTime();
+ s = packet.getTime();
r = now;
- //if read() was not executed yet (timestamp equals to zero
indicates))
- //then we need to keep packets
- if (timestamp == 0) {
- queue.offer(rtpPacket);
- duration += period;
- if (!ready && duration > (period + jitter)) {
- ready = true;
- }
+ if (ready && readStarted && t <= timestamp) {
+ //read process already started, buffer is full and packet is
outstanding
+ //packet should be discarded
+ logger.warn("Packet " + packet + " is discarded by jitter
buffer( packet time=" + t + ", current time " + (timestamp));
return;
}
-
- //if buffer's ready flag equals true then it means that reading
- //starting and we should compare timestamp of arrived packet with
time of
- //last reading.
- if (ready && t < timestamp + period) {
- //silentrly discard otstanding packet
- logger.warn("Packet " + rtpPacket + " is discarded by jitter
buffer( packet time=" + t + ", current time " + (timestamp));
- return;
- }
-
- //if RTP packet is not outstanding or reading not started yet
(ready == false)
- //queue packet.
- rtpPacket.setTime(t);
- queue.offer(rtpPacket);
-
- //allow read when buffer is full;
- duration += period;
- if (!ready && duration > (period + jitter)) {
+
+ if (!writeStarted) {
+ //this is a first arrived packet
+ queue[0] = packet;
+ writeStarted = true;
+ } else {
+ //the last received packet
+ RtpPacket prev = queue[writeCursor];
+ long diff = packet.getSeqNumber() - prev.getSeqNumber();
+
+ //normaly just received packet must be next in sequence
+ if (diff == 1) {
+ //everything is fine. writing packet and calculating
duration
+ //for previous one
+ writeCursor = inc(writeCursor, 1);
+ queue[writeCursor] = packet;
+
+ prev.setDuration(packet.getTime() - prev.getTime());
+ duration += prev.getDuration();
+ } else if (diff > 1) {
+ //this packet arrives before another one
+ //we do not know the destinity of the missed packet(s)
+ //so we are updating duration like all missed packets are
lost
+ prev.setDuration(packet.getTime() - prev.getTime());
+
+ //but we will leave empty slots for missed packet(s) and
give
+ //them chance to arrive in time.
+ writeCursor = inc(writeCursor, (int)diff);
+ queue[writeCursor] = packet;
+
+ duration += prev.getDuration();
+ } else {
+ //diff < 0? this is missing packets and it arrives not to
late
+ //so we can process it.
+
+ int rightIndex = writeCursor;
+
+ //inserting this packet in its slot
+ writeCursor = inc(writeCursor, (int)diff);
+ queue[writeCursor] = packet;
+
+ //now we need to update duration of the packet in front of
this one and
+ //duration of this packet itself too
+
+ //searching left neightbor packet
+ int i = dec(writeCursor, 1);
+ int count = 0;
+ while (queue[i] == null && count < queue.length - 1) {
+ i = dec(i, 1);
+ count++;
+ }
+
+ queue[i].setDuration(packet.getTime() -
queue[i].getTime());
+
+ //now searching right neightbor packet
+ i = inc(writeCursor, 1);
+ while (queue[i] == null && i < rightIndex) {
+ i = inc(i, 1);
+ }
+
+ packet.setDuration(queue[i].getTime() - packet.getTime());
+
+ //the duration of the buffer is not changed!
+ }
+ }
+
+ if (!ready && duration > (jitter)) {
ready = true;
}
}
+
+ private int inc(int a, int diff) {
+ int res = a + diff;
+ if (res >= queue.length) {
+ res = res - queue.length;
+ }
+ return res;
+ }
+
+ private int dec(int a, int diff) {
+ int res = a - diff;
+ if (res < 0) {
+ res = queue.length - a;
+ }
+ return res;
+ }
+
public void reset() {
- queue.clear();
duration = 0;
clock.reset();
drift = 0;
r = 0;
s = 0;
+ readStarted = true;
}
/**
@@ -169,24 +234,35 @@
return null;
}
- //calculate drift for the first read()
- //drift equals to zero indicates that this is the first read
- if (drift == 0) {
- RtpPacket head = queue.peek();
- drift = head.getTime() - timestamp;
- }
-
- //calculate local timestamp taking into account drift
+ //before read any packets let's compute time drift between
+ //remote and local peers.
+ if (!readStarted) {
+ readStarted = true;
+ drift = queue[0].getTime() - timestamp;
+ }
+
+ //now our clock shows time specified as timestamp parameter
+ //the same time measured using remote clock is as follows:
this.timestamp = timestamp + drift;
- //if packet queue is empty (but was full) we have to return silence
- if (queue.isEmpty()) {
+ //we have to read from buffer all packets with timestamp less then
current
+ //time measured using remote clock.
+
+ //before reading we have to be sure that buffer is not empty
+ //we can do it if compare absolute read and write indexes
+ //when packet queue is not empty absolute read index is less then
write index
+ if (duration == 0) {
return null;
}
//fill media buffer
- //@TODO check packets using timestamp
-// System.out.println("r " + System.currentTimeMillis() + "/" +
this.timestamp);
- return queue.poll();
+ RtpPacket packet = queue[readCursor];
+
+ queue[readCursor] = null;
+
+ duration -= packet.getDuration();
+ readCursor = inc(readCursor, 1);
+
+ return packet;
}
}
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/ReceiveStream.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/ReceiveStream.java
Thu Feb 18 09:31:18 2010
@@ -48,25 +48,23 @@
private ArrayList<Format> formats = new ArrayList();
private AVProfile avProfile;
private Codec codec;
+
+ //RFC 2833
private DtmfConverter dtmfConverter = new DtmfConverter();
+
/** Creates a new instance of ReceiveStream */
public ReceiveStream(RtpSocket rtpSocket, int jitter, AVProfile
formatConfig) {
super("ReceiveStream");
this.avProfile = formatConfig;
this.rtpSocket = rtpSocket;
+
//synchronize stream from socket's timer
setSyncSource(rtpSocket.timer);
//construct jitter buffer
- jitterBuffer = new JitterBuffer(jitter, getPeriod());
+ jitterBuffer = new JitterBuffer(jitter);
jitterBuffer.setClock(rtpSocket.getClock());
}
-
- @Override
- public void setPeriod(int period) {
- super.setPeriod(period);
- jitterBuffer.setPeriod(period);
- }
/**
* Processes received RTP packet.
@@ -74,14 +72,15 @@
* @param rtpPacket packet for processing
*/
protected void process(RtpPacket rtpPacket) {
-// if (logger.isTraceEnabled()) {
-// logger.trace("Receive " + rtpPacket);
-// }
+ //write packet to jitter buffer.
+ //the purpose of jitter buffer is to transform varibale jitter into
+ //fixed delay.
jitterBuffer.write(rtpPacket);
}
@Override
public void beforeStart() {
+ //let's reset jitter buffer
jitterBuffer.reset();
}
@@ -93,12 +92,17 @@
super.connect(sink);
}
+ /**
+ * Assigns payload number for rfc2833 dtmf.
+ *
+ * @param dtmf the number of payload.
+ */
public void setDtmf(int dtmf) {
this.dtmf = dtmf;
}
/**
- * Configures supported formats.
+ * Configures supported formats of main stream.
*
* @param payloadID the payload number of format used by rtp socket
* @param format the format used by rtp socket.
@@ -166,45 +170,50 @@
}
}
+ /**
+ * Gets the value of the interarrival jitter.
+ *
+ * @return jitter value
+ */
public double getInterArrivalJitter() {
return jitterBuffer.getInterArrivalJitter();
}
+ /**
+ * Returns maximum jitter value.
+ *
+ * @return the jitter value.
+ */
public double getMaxJitter() {
return jitterBuffer.getMaxJitter();
}
public void evolve(Buffer buffer, long timestamp, long seq) {
- buffer.setDuration(getPeriod());
- if (!rtpSocket.registered) {
- buffer.setFlags(Buffer.FLAG_SILENCE);
- return;
- }
+ //reading next packet from jitter buffer
RtpPacket packet = jitterBuffer.read(timestamp);
if (packet == null) {
- buffer.setFlags(Buffer.FLAG_SILENCE);
- return;
- }
- if (packet.getPayloadType() == mainstream) {
- byte[] data = packet.getPayload();
-
- buffer.setData(data);
- buffer.setLength(data.length);
+ buffer.setFlags(Buffer.FLAG_DISCARD);
+ buffer.setDuration(rtpSocket.getPeriod());
+ } else if (packet.getPayloadType() == mainstream) {
+ buffer.setData(packet.getPayload());
buffer.setOffset(0);
+ buffer.setLength(packet.getPayload().length);
+
+ if (packet.getDuration() != -1) {
+ buffer.setDuration(packet.getDuration());
+ } else {
+ buffer.setDuration(rtpSocket.getPeriod());
+ }
buffer.setFormat(format);
- buffer.setDuration(getPeriod());
- buffer.setFlags(0);
} else if (packet.getPayloadType() == dtmf) {
- dtmfConverter.process(packet, getPeriod(), buffer);
+ dtmfConverter.process(packet, buffer);
}
-
- if (codec != null && buffer.getFlags() != Buffer.FLAG_SILENCE) {
+ if (codec != null) {
codec.process(buffer);
}
-
+
buffer.setSequenceNumber(seq);
- buffer.setTimeStamp(packet.getTime());
}
/**
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpFactory.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpFactory.java
Thu Feb 18 09:31:18 2010
@@ -39,7 +39,6 @@
import java.util.Hashtable;
import java.util.List;
-import java.util.Set;
import net.java.stun4j.StunAddress;
import net.java.stun4j.StunException;
import net.java.stun4j.client.NetworkConfigurationDiscoveryProcess;
@@ -61,10 +60,9 @@
public class RtpFactory {
//private HashMap<String, Transceiver> transceivers = new HashMap();
- private Receiver transceiver;
+ private Receiver receiver;
private Integer jitter = 60;
private InetAddress bindAddress;
- private Hashtable<String, Integer> localPorts;
protected InetSocketAddress publicAddress;
private String stunHost;
private int stunPort = 3478;
@@ -81,14 +79,6 @@
*/
public RtpFactory() {
}
-
- public Hashtable<String, Integer> getLocalPorts() {
- return localPorts;
- }
-
- public void setLocalPorts(Hashtable<String, Integer> localPorts) {
- this.localPorts = localPorts;
- }
/**
* Gets the address of stun server if present.
@@ -114,20 +104,8 @@
}
public void start() throws SocketException, IOException, StunException
{
- transceiver = new Receiver(this);
- transceiver.start();
- //creating transcievers for each local port defined
- Set<String> mediaTypes = localPorts.keySet();
- for (String media : mediaTypes) {
- socketsPool.put(media, new ArrayList());
- int localPort = localPorts.get(media);
-
- InetSocketAddress address = new InetSocketAddress(bindAddress,
localPort);
- logger.info("Binding RTP transceiver to " + bindAddress + ":"
+ localPort);
-
- }
-
-
+ receiver = new Receiver(this);
+ receiver.start();
//prepare sockets
ArrayList<RtpSocket> list = new ArrayList();
@@ -148,7 +126,7 @@
}
public void stop() {
- transceiver.stop();
+ receiver.stop();
}
public int getPeriod() {
@@ -264,14 +242,14 @@
}
protected Selector getSelector() {
- return transceiver.getSelector();
+ return receiver.getSelector();
}
protected void register() {
while (!registerQueue.isEmpty()) {
RtpSocket socket = registerQueue.poll();
try {
- socket.register(transceiver.getSelector());
+ socket.register(receiver.getSelector());
} catch (ClosedChannelException e) {
socket.release();
}
@@ -289,6 +267,10 @@
*/
public RtpSocket getRTPSocket(String media) throws IOException,
ResourceUnavailableException {
// return object from local if present
+ if (!socketsPool.containsKey(media)) {
+ ArrayList<RtpSocket> list = new ArrayList();
+ socketsPool.put(media, list);
+ }
Collection<RtpSocket> list = socketsPool.get(media);
if (!list.isEmpty()) {
RtpSocket socket = socketsPool.get(media).remove(0);
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpPacket.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpPacket.java
Thu Feb 18 09:31:18 2010
@@ -15,10 +15,6 @@
*/
package org.mobicents.media.server.impl.rtp;
-import java.io.ByteArrayInputStream;
-import java.io.ByteArrayOutputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
import java.io.Serializable;
import java.nio.ByteBuffer;
@@ -41,6 +37,8 @@
private int offset = 0;
private int length = 0;
private long time;
+ private long duration = -1;
+ private boolean isValid = false;
private byte[] buff;
@@ -75,59 +73,8 @@
payload = new byte[len - 12];
System.arraycopy(buff, 12, payload, 0, payload.length);
-
-/* int b = readerBuffer.get() & 0xff;
-
- version = (b & 0x0C) >> 6;
- padding = (b & 0x20) == 0x020;
- extensions = (b & 0x10) == 0x10;
- cc = b & 0x0F;
-
- b = readerBuffer.get() & 0xff;
- marker = (b & 0x80) == 0x80;
- payloadType = b & 0x7F;
-
- seqNumber = (readerBuffer.get() & 0xff) << 8;
- seqNumber = seqNumber | (readerBuffer.get() & 0xff);
-
- timestamp = readerBuffer.getInt();
- ssrc = readerBuffer.getInt();
-
- payload = new byte[len - 12];
- readerBuffer.get(payload, 0, payload.length);
- */
}
- /** Creates a new instance of RtpPacket */
- public RtpPacket(byte[] data) throws IOException {
- DataInputStream in = new DataInputStream(new
ByteArrayInputStream(data));
- int b = in.read() & 0xff;
- version = (b & 0x0C) >> 6;
- padding = (b & 0x20) == 0x020;
- extensions = (b & 0x10) == 0x10;
- cc = b & 0x0F;
-
- b = in.read() & 0xff;
-
- marker = (b & 0x80) == 0x80;
- payloadType = b & 0x7F;
- seqNumber = (in.read() & 0xff) << 8;
- seqNumber = seqNumber | (in.read() & 0xff);
-
- timestamp = in.readInt();
- ssrc = in.readInt();
-
- payload = new byte[8192];
- int numBytes = in.read(payload);
- if (numBytes < 0) {
- numBytes = 0;
- }
- byte[] realPayload = new byte[numBytes];
- for (int q = 0; q < numBytes; q++) {
- realPayload[q] = payload[q];
- }
- payload = realPayload;
- }
public RtpPacket(byte payloadType, int seqNumber, int timestamp, long
ssrc,
byte[] payload) {
@@ -152,6 +99,15 @@
this.buff = new byte[payload.length + 12];
}
+
+ public boolean isValid() {
+ return isValid;
+ }
+
+ public void setValid(boolean isValid) {
+ this.isValid = isValid;
+ }
+
public boolean getMarker() {
return marker;
}
@@ -179,6 +135,14 @@
public void setTime(long time) {
this.time = time;
}
+
+ public long getDuration() {
+ return duration;
+ }
+
+ public void setDuration(long duration) {
+ this.duration = duration;
+ }
public byte[] toByteArray() {
buff[0] = 0x8;
@@ -196,26 +160,6 @@
buff[10] =((byte) ((ssrc & 0x0000FF00) >> 8));
buff[11] =((byte) ((ssrc & 0x000000FF)));
System.arraycopy(payload, offset, buff, 12, length);
-/* ByteArrayOutputStream bout = new ByteArrayOutputStream();
- bout.write(0x80);
- bout.write(payloadType);
- bout.write((byte) ((seqNumber & 0xFF00) >> 8));
- bout.write((byte) (seqNumber & 0x00FF));
-
- bout.write((byte) ((timestamp & 0xFF000000) >> 24));
- bout.write((byte) ((timestamp & 0x00FF0000) >> 16));
- bout.write((byte) ((timestamp & 0x0000FF00) >> 8));
- bout.write((byte) ((timestamp & 0x000000FF)));
-
- bout.write((byte) ((ssrc & 0xFF000000) >> 24));
- bout.write((byte) ((ssrc & 0x00FF0000) >> 16));
- bout.write((byte) ((ssrc & 0x0000FF00) >> 8));
- bout.write((byte) ((ssrc & 0x000000FF)));
-
- bout.write(payload, offset, length);
-
- return bout.toByteArray();
- */
return buff;
}
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpSocket.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/RtpSocket.java
Thu Feb 18 09:31:18 2010
@@ -167,9 +167,12 @@
public void setPeriod(int period) {
this.period = period;
- receiveStream.setPeriod(period);
}
+ public int getPeriod() {
+ return period;
+ }
+
public Collection<Codec> getCodecs() {
return codecs;
}
@@ -318,10 +321,20 @@
packetsReceived = 0;
}
+ /**
+ * Gets the currently assigned listener.
+ *
+ * @return the listener instance.
+ */
public RtpSocketListener getListener() {
return listener;
}
+ /**
+ * Assigns listener which will receive notifications.
+ *
+ * @param listener the listener instance.
+ */
public void setListener(RtpSocketListener listener) {
this.listener = listener;
}
@@ -362,6 +375,12 @@
send(p);
}
+ /**
+ * Sends rtp packet to the remote peer.
+ *
+ * @param packet the rtp packet as binary arrary
+ * @throws java.io.IOException
+ */
public void send(byte[] packet) throws IOException {
//coverting packet to binary array and sent to the remote address.
sendBuffer.rewind();
=======================================
---
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/SendStream.java
Wed Feb 10 05:35:57 2010
+++
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/impl/rtp/SendStream.java
Thu Feb 18 09:31:18 2010
@@ -31,9 +31,9 @@
import java.util.Collection;
import org.mobicents.media.Buffer;
import org.mobicents.media.Format;
-import org.mobicents.media.MediaSink;
import org.mobicents.media.MediaSource;
import org.mobicents.media.server.impl.AbstractSink;
+import org.mobicents.media.server.impl.resource.dtmf.DtmfEvent;
import org.mobicents.media.server.impl.rtp.sdp.AVProfile;
import org.mobicents.media.server.spi.dsp.Codec;
@@ -102,12 +102,7 @@
public void onMediaTransfer(Buffer buffer) throws IOException {
RtpPacket packet = null;
- int timestamp = (int) clock.getTimestamp(buffer.getTimeStamp());
- packet = new RtpPacket(mainstream, seq++, timestamp, ssrc,
- buffer.getData(), buffer.getOffset(),
buffer.getLength());
- rtpSocket.send(packet);
-/*
- if (buffer.getFlags() == Buffer.FLAG_RTP_BINARY) {
+ if (buffer.getFormat() == Format.RAW_RTP) {
rtpSocket.send(buffer.getData());
return;
}
@@ -117,8 +112,7 @@
}
int timestamp = (int) clock.getTimestamp(buffer.getTimeStamp());
- RtpPacket packet = null;
-
+
if (buffer.getHeader() != null && buffer.getHeader() instanceof
DtmfEvent && dtmf > 0) {
DtmfEvent evt = (DtmfEvent) buffer.getHeader();
int digit = evt.getEventID();
@@ -132,8 +126,7 @@
data[3] = (byte) (eventDuration);
eventDuration = eventDuration + 160;
- packet = new RtpPacket(dtmf, seq++, timestamp, ssrc,
- data, 0, 4);
+ packet = new RtpPacket(dtmf, seq++, timestamp, ssrc, data, 0,
4);
} else {
packet = new RtpPacket(mainstream, seq++, timestamp, ssrc,
buffer.getData(), buffer.getOffset(),
buffer.getLength());
@@ -142,8 +135,7 @@
if (logger.isTraceEnabled()) {
logger.trace("Sending " + packet);
- }
- */
+ }
}
/**