Revision: 9168
Author: [email protected]
Date: Tue Dec 29 23:14:58 2009
Log: moved RTSPEndpoint and RTSPConnection to server-impl
http://code.google.com/p/mobicents/source/detail?r=9168

Added:
/trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/RtspEndpointImpl.java /trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/RtspRtpConnectionImpl.java
Deleted:
/trunk/servers/media/core/controllers/rtsp/src/main/java/org/mobicents/media/server/ctrl/rtsp/rtp /trunk/servers/media/core/controllers/rtsp/src/main/java/org/mobicents/media/server/rtsp

=======================================
--- /dev/null
+++ /trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/RtspEndpointImpl.java Tue Dec 29 23:14:58 2009
@@ -0,0 +1,286 @@
+package org.mobicents.media.server;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.sdp.SdpFactory;
+
+import org.apache.log4j.Logger;
+import org.mobicents.media.Component;
+import org.mobicents.media.ComponentFactory;
+import org.mobicents.media.MediaSink;
+import org.mobicents.media.MediaSource;
+import org.mobicents.media.server.impl.rtp.RtpFactory;
+import org.mobicents.media.server.resource.ChannelFactory;
+import org.mobicents.media.server.spi.Connection;
+import org.mobicents.media.server.spi.ConnectionListener;
+import org.mobicents.media.server.spi.ConnectionMode;
+import org.mobicents.media.server.spi.Endpoint;
+import org.mobicents.media.server.spi.NotificationListener;
+import org.mobicents.media.server.spi.ResourceUnavailableException;
+import org.mobicents.media.server.spi.Timer;
+import org.mobicents.media.server.spi.TooManyConnectionsException;
+
+public class RtspEndpointImpl implements Endpoint {
+
+       private String localName;
+       private boolean isInUse = false;
+       private Timer timer;
+       private Map<String, ComponentFactory> sourceFactory;
+       private Map<String, ComponentFactory> sinkFactory;
+
+ private HashMap<String, MediaSource> sources = new HashMap<String, MediaSource>(); + private HashMap<String, MediaSink> sinks = new HashMap<String, MediaSink>();
+
+       private ArrayList<String> mediaTypes = new ArrayList<String>();
+
+       private ConnectionFactory connectionFactory;
+    private Hashtable<String, RtpFactory> rtpFactory;
+
+ protected transient HashMap<String, Connection> connections = new HashMap();
+    protected ReentrantLock state = new ReentrantLock();
+
+    private SdpFactory sdpFactory = SdpFactory.getInstance();
+
+ /** The list of indexes available for connection enumeration within endpoint */
+    private ArrayList<Integer> index = new ArrayList();
+    /** The last generated connection's index*/
+    private int lastIndex = -1;
+
+    private final Logger logger = Logger.getLogger(RtspEndpointImpl.class);
+
+       public RtspEndpointImpl() {
+       }
+
+       public RtspEndpointImpl(String localName) {
+               this.localName = localName;
+       }
+
+ public void setSourceFactory(Map<String, ComponentFactory> sourceFactory) {
+               this.sourceFactory = sourceFactory;
+       }
+
+       public Map<String, ComponentFactory> getSourceFactory() {
+               return sourceFactory;
+       }
+
+       public void setSinkFactory(Map<String, ComponentFactory> sinkFactory) {
+               this.sinkFactory = sinkFactory;
+       }
+
+       public Map<String, ComponentFactory> getSinkFactory() {
+               return sinkFactory;
+       }
+
+       public Collection<String> getMediaTypes() {
+               return mediaTypes;
+       }
+
+    public MediaSink getSink(String media) {
+        return sinks.get(media);
+    }
+
+    public MediaSource getSource(String media) {
+        return sources.get(media);
+    }
+
+    public SdpFactory getSdpFactory() {
+        return sdpFactory;
+    }
+
+       public void setConnectionFactory(ConnectionFactory connectionFactory) {
+
+ Map<String, ChannelFactory> rxChannelFactories = connectionFactory.getRxChannelFactory(); + Map<String, ChannelFactory> txChannelFactories = connectionFactory.getTxChannelFactory();
+
+               for (ChannelFactory chFact : rxChannelFactories.values()) {
+ if (chFact.getPipes().size() != 0 || chFact.getComponents().size() != 0) { + throw new UnsupportedOperationException("RTSPEndpoint cannot be configured for custom media path");
+                       }
+               }
+
+               for (ChannelFactory chFact : txChannelFactories.values()) {
+ if (chFact.getPipes().size() != 0 || chFact.getComponents().size() != 0) { + throw new UnsupportedOperationException("RTSPEndpoint cannot be configured for custom media path");
+                       }
+               }
+
+               this.connectionFactory = connectionFactory;
+       }
+
+       public ConnectionFactory getConnectionFactory() {
+               return this.connectionFactory;
+       }
+
+    public void setRtpFactory(Hashtable<String, RtpFactory> rtpFactory) {
+        this.rtpFactory = rtpFactory;
+    }
+
+    public Hashtable<String, RtpFactory> getRtpFactory() {
+        return this.rtpFactory;
+    }
+
+       public void addConnectionListener(ConnectionListener listener) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void addNotificationListener(NotificationListener listener) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+ public Connection createConnection(ConnectionMode mode) throws TooManyConnectionsException,
+                       ResourceUnavailableException {
+        state.lock();
+        try {
+            if (logger.isDebugEnabled()) {
+ logger.debug(getLocalName() + ", creating RTP connection, mode=" + mode);
+            }
+ RtspRtpConnectionImpl connection = new RtspRtpConnectionImpl(this, mode);
+            connections.put(connection.getId(), connection);
+            this.isInUse = true;
+            return connection;
+        } catch (Exception e) {
+            logger.error("Could not create RTP connection", e);
+            throw new ResourceUnavailableException(e.getMessage());
+        } finally {
+            state.unlock();
+        }
+       }
+
+ public Connection createLocalConnection(ConnectionMode mode) throws TooManyConnectionsException,
+                       ResourceUnavailableException {
+               // TODO Auto-generated method stub
+               return null;
+       }
+
+       public void deleteAllConnections() {
+        state.lock();
+        try {
+            ConnectionImpl[] list = new ConnectionImpl[connections.size()];
+            connections.values().toArray(list);
+            for (int i = 0; i < list.length; i++) {
+                deleteConnection(list[i].getId());
+            }
+        } finally {
+            state.unlock();
+        }
+
+       }
+
+       public void deleteConnection(String connectionID) {
+        state.lock();
+        try {
+ ConnectionImpl connection = (ConnectionImpl) connections.remove(connectionID);
+            if (connection != null) {
+                if (logger.isDebugEnabled()) {
+ logger.debug(getLocalName() + ", Deleting connection " + connection.getIndex());
+                }
+                connection.close();
+                index.add(connection.getIndex());
+            }
+            isInUse = connections.size() > 0;
+        } finally {
+            state.unlock();
+        }
+       }
+
+       public Component getComponent(String resourceName) {
+        Collection<MediaSource> components = sources.values();
+        for (MediaSource source : components) {
+            if (source.getName().matches(resourceName)) {
+                return source;
+            }
+        }
+
+        Collection<MediaSink> components2 = sinks.values();
+        for (MediaSink sink : components2) {
+            if (sink.getName().matches(resourceName)) {
+                return sink;
+            }
+        }
+
+        return null;
+       }
+
+       public Connection getConnection(String connectionID) {
+               return connections.get(connectionID);
+       }
+
+       public String getLocalName() {
+               return localName;
+       }
+
+       public void setLocalName(String localName) {
+               this.localName = localName;
+       }
+
+       public Timer getTimer() {
+               return timer;
+       }
+
+       public void setTimer(Timer timer) {
+               this.timer = timer;
+       }
+
+       public boolean hasConnections() {
+               return !connections.isEmpty();
+       }
+
+       public boolean isInUse() {
+               return this.isInUse;
+       }
+
+       public void removeConnectionListener(ConnectionListener listener) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void removeNotificationListener(NotificationListener listener) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void setInUse(boolean inUse) {
+               this.isInUse = inUse;
+       }
+
+       public void start() throws ResourceUnavailableException {
+               if (sourceFactory != null) {
+                       Set<String> types = sourceFactory.keySet();
+                       for (String media : types) {
+                               ComponentFactory factory = 
sourceFactory.get(media);
+                               MediaSource source = (MediaSource) 
factory.newInstance(this);
+                               source.setEndpoint(this);
+                               sources.put(media, source);
+                               if (!mediaTypes.contains(media)) {
+                                       mediaTypes.add(media);
+                               }
+                       }
+               }
+
+               if (sinkFactory != null) {
+                       Set<String> types = sinkFactory.keySet();
+                       for (String media : types) {
+                               ComponentFactory factory = 
sinkFactory.get(media);
+                               MediaSink sink = (MediaSink) 
factory.newInstance(this);
+                               sink.setEndpoint(this);
+                               sinks.put(media, sink);
+                               if (!mediaTypes.contains(media)) {
+                                       mediaTypes.add(media);
+                               }
+                       }
+               }
+       }
+
+       public void stop() {
+               logger.info("Stopped " + localName);
+       }
+
+    public int getConnectionIndex() {
+        return index.isEmpty() ? ++lastIndex : index.remove(0);
+    }
+
+}
=======================================
--- /dev/null
+++ /trunk/servers/media/core/server-impl/src/main/java/org/mobicents/media/server/RtspRtpConnectionImpl.java Tue Dec 29 23:14:58 2009
@@ -0,0 +1,310 @@
+package org.mobicents.media.server;
+
+import java.io.IOException;
+import java.net.InetAddress;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+import java.util.Set;
+import java.util.Vector;
+
+import javax.sdp.Attribute;
+import javax.sdp.MediaDescription;
+import javax.sdp.SdpException;
+import javax.sdp.SdpFactory;
+import javax.sdp.SdpParseException;
+import javax.sdp.SessionDescription;
+
+import org.mobicents.media.Format;
+import org.mobicents.media.MediaSink;
+import org.mobicents.media.MediaSource;
+import org.mobicents.media.format.AudioFormat;
+import org.mobicents.media.format.VideoFormat;
+import org.mobicents.media.server.impl.rtp.RtpFactory;
+import org.mobicents.media.server.impl.rtp.RtpSocket;
+import org.mobicents.media.server.impl.rtp.RtpSocketListener;
+import org.mobicents.media.server.impl.rtp.sdp.AVProfile;
+import org.mobicents.media.server.resource.Channel;
+import org.mobicents.media.server.resource.ChannelFactory;
+import org.mobicents.media.server.spi.Connection;
+import org.mobicents.media.server.spi.ConnectionMode;
+import org.mobicents.media.server.spi.ConnectionState;
+import org.mobicents.media.server.spi.ResourceUnavailableException;
+
+public class RtspRtpConnectionImpl extends ConnectionImpl implements RtpSocketListener {
+
+ private HashMap<String, RtpSocket> rtpSockets = new HashMap<String, RtpSocket>();
+
+       private String remoteDescriptor;
+       private SdpFactory sdpFactory;
+
+ public RtspRtpConnectionImpl(RtspEndpointImpl endpoint, ConnectionMode mode) throws ResourceUnavailableException {
+               super(endpoint, mode);
+
+               sdpFactory = endpoint.getSdpFactory();
+               // obtain channel factory
+               ConnectionFactory connectionFactory = 
endpoint.getConnectionFactory();
+ Map<String, ChannelFactory> rxChannelFactories = connectionFactory.getRxChannelFactory(); + Map<String, ChannelFactory> txChannelFactories = connectionFactory.getTxChannelFactory();
+
+               // obtain rtp factories
+               Hashtable<String, RtpFactory> rtpFactories = 
endpoint.getRtpFactory();
+
+               // obtain list of supported media types
+               Collection<String> mediaTypes = endpoint.getMediaTypes();
+
+               // creating channel for each media type
+               for (String media : mediaTypes) {
+                       // creating rx channel
+                       ChannelFactory rxChannelFactory = 
rxChannelFactories.get(media);
+                       Channel rxChannel = 
rxChannelFactory.newInstance(endpoint);
+
+                       rxChannel.setConnection(this);
+                       rxChannel.setEndpoint(endpoint);
+
+                       MediaSink sink = endpoint.getSink(media);
+                       rxChannel.connect(sink);
+                       rxChannels.put(media, rxChannel);
+
+                       // creating tx channel
+                       ChannelFactory txChannelFactory = 
txChannelFactories.get(media);
+                       Channel txChannel = 
txChannelFactory.newInstance(endpoint);
+
+                       txChannel.setConnection(this);
+                       txChannel.setEndpoint(endpoint);
+
+                       MediaSource source = endpoint.getSource(media);
+                       txChannel.connect(source);
+                       txChannels.put(media, txChannel);
+
+                       // create rtp socket
+                       RtpFactory rtpFactory = rtpFactories.get(media);
+                       if (rtpFactory == null) {
+ throw new ResourceUnavailableException("RTP socket for media " + media + " expected");
+                       }
+                       RtpSocket socket = null;
+                       try {
+                               socket = rtpFactory.getRTPSocket();
+                               // save reference
+                               rtpSockets.put(media, socket);
+                       } catch (Exception e) {
+                               throw new ResourceUnavailableException(e);
+                       }
+               }
+
+               this.mode = mode;
+               setState(ConnectionState.HALF_OPEN);
+       }
+
+       public String getLocalDescriptor() {
+               throw new UnsupportedOperationException("Local SDP is not defined 
");
+
+       }
+
+       public long getPacketsReceived(String media) {
+               return rtpSockets.get(media).getSendStream().getBytesReceived();
+       }
+
+       public long getPacketsTransmitted(String media) {
+               return 
rtpSockets.get(media).getReceiveStream().getPacketsTransmitted();
+       }
+
+       public String getRemoteDescriptor() {
+               return this.remoteDescriptor;
+       }
+
+       public void txStart(String media) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void rxStart(String media) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void txStop(String media) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void rxStop(String media) {
+               throw new UnsupportedOperationException("Not supported yet.");
+       }
+
+       public void setOtherParty(Connection other) throws IOException {
+               throw new UnsupportedOperationException("Not supported yet.");
+
+       }
+
+ public void setRemoteDescriptor(String descriptor) throws SdpException, IOException, ResourceUnavailableException {
+               this.remoteDescriptor = descriptor;
+
+ if (getState() != ConnectionState.HALF_OPEN && getState() != ConnectionState.OPEN) {
+                       throw new IllegalStateException("State is " + 
getState());
+               }
+
+               // parse session descriptor
+               SessionDescription sdp = 
sdpFactory.createSessionDescription(descriptor);
+
+               // determine address of the remote party
+ InetAddress address = InetAddress.getByName(sdp.getConnection().getAddress());
+
+               // analyze media descriptions
+ Vector<MediaDescription> mediaDescriptions = sdp.getMediaDescriptions(false);
+               for (MediaDescription md : mediaDescriptions) {
+                       // determine media type
+                       String mediaType = md.getMedia().getMediaType();
+
+                       // parse offered formats
+                       HashMap<Integer, Format> offer = new HashMap();
+                       if (mediaType.equals("audio")) {
+                               this.parseAudioFormats(md, offer);
+                       } else if (mediaType.equals("video")) {
+                               this.parseVideoFormats(md, offer);
+                       }
+
+                       if (offer.size() != 1) {
+ throw new SdpException("More than one Format offered for media type = " + mediaType);
+                       }
+
+                       RtpSocket rtpSocket = rtpSockets.get(mediaType);
+
+                       // assign preffered format
+ rtpSocket.setFormat(offer.keySet().iterator().next(), offer.values().iterator().next());
+
+                       int port = md.getMedia().getMediaPort();
+                       rtpSocket.setPeer(address, port);
+
+                       Channel rxChannel = rxChannels.get(mediaType);
+                       Channel txChannel = txChannels.get(mediaType);
+
+                       rxChannel.connect(rtpSocket.getReceiveStream());
+                       txChannel.connect(rtpSocket.getSendStream());
+
+                       rxChannel.start();
+                       txChannel.start();
+
+                       rtpSocket.getReceiveStream().start();
+                       rtpSocket.getSendStream().start();
+
+                       setMode(mode);
+               }
+
+               setState(ConnectionState.OPEN);
+       }
+
+       @Override
+       public void close() {
+               EndpointImpl endpoint = (EndpointImpl) getEndpoint();
+               Set<String> mediaTypes = rtpSockets.keySet();
+               for (String media : mediaTypes) {
+                       RtpSocket socket = rtpSockets.get(media);
+
+                       Channel rxChannel = rxChannels.get(media);
+                       Channel txChannel = txChannels.get(media);
+
+                       rxChannel.stop();
+                       txChannel.stop();
+
+                       socket.getReceiveStream().stop();
+                       socket.getSendStream().stop();
+
+                       rxChannel.disconnect(endpoint.getSink(media));
+                       txChannel.disconnect(endpoint.getSource(media));
+
+                       rxChannel.disconnect(socket.getReceiveStream());
+                       txChannel.disconnect(socket.getSendStream());
+
+                       rxChannel.close();
+                       txChannel.close();
+
+                       socket.release();
+               }
+
+               rxChannels.clear();
+               txChannels.clear();
+
+               rtpSockets.clear();
+               super.close();
+       }
+
+       public void error(Exception e) {
+               getEndpoint().deleteConnection(this.getId());
+       }
+
+ // TODO This computation from RtpConnectionImpl and RtspRtpConnectionImple should go to some common file + private void parseAudioFormats(MediaDescription md, HashMap<Integer, Format> formats) throws SdpParseException {
+               Vector<String> payloads = md.getMedia().getMediaFormats(false);
+               for (String payload : payloads) {
+                       Integer pt = Integer.parseInt(payload);
+                       formats.put(pt, AVProfile.getAudioFormat(pt));
+               }
+               Vector<Attribute> attributes = md.getAttributes(false);
+               for (Attribute attribute : attributes) {
+                       if (attribute.getName().equals("rtpmap")) {
+                               parseAudioFormat(attribute.getValue(), formats);
+                       }
+               }
+       }
+
+ private void parseVideoFormats(MediaDescription md, HashMap<Integer, Format> formats) throws SdpParseException {
+               Vector<String> payloads = md.getMedia().getMediaFormats(false);
+               for (String payload : payloads) {
+                       Integer pt = Integer.parseInt(payload);
+                       formats.put(pt, AVProfile.getVideoFormat(pt));
+               }
+               Vector<Attribute> attributes = md.getAttributes(false);
+               for (Attribute attribute : attributes) {
+                       if (attribute.getName().equals("rtpmap")) {
+                               parseVideoFormat(attribute.getValue(), formats);
+                       }
+               }
+       }
+
+ private void parseAudioFormat(String rtpmap, HashMap<Integer, Format> formats) {
+               String tokens[] = rtpmap.toLowerCase().split(" ");
+
+               // split params
+               int p = Integer.parseInt(tokens[0]);
+               tokens = tokens[1].split("/");
+
+               String encodingName = tokens[0];
+               double clockRate = Double.parseDouble(tokens[1]);
+
+               int chans = 1;
+               if (tokens.length == 3) {
+                       chans = Integer.parseInt(tokens[2]);
+               }
+
+               if (encodingName.equals("pcmu")) {
+                       formats.put(p, new AudioFormat(AudioFormat.ULAW, 
clockRate, 8, chans));
+               } else if (encodingName.equals("pcma")) {
+                       formats.put(p, new AudioFormat(AudioFormat.ALAW, 
clockRate, 8, chans));
+               } else if (encodingName.equals("speex")) {
+ formats.put(p, new AudioFormat(AudioFormat.SPEEX, clockRate, AudioFormat.NOT_SPECIFIED, chans));
+               } else if (encodingName.equals("telephone-event")) {
+ formats.put(p, new AudioFormat("telephone-event", clockRate, AudioFormat.NOT_SPECIFIED,
+                                       AudioFormat.NOT_SPECIFIED));
+               } else if (encodingName.equals("g729")) {
+ formats.put(p, new AudioFormat(AudioFormat.G729, clockRate, AudioFormat.NOT_SPECIFIED, chans));
+               } else if (encodingName.equals("gsm")) {
+ formats.put(p, new AudioFormat(AudioFormat.GSM, clockRate, AudioFormat.NOT_SPECIFIED, chans));
+               } else if (encodingName.equals("l16")) {
+ formats.put(p, new AudioFormat(AudioFormat.LINEAR, clockRate, 16, chans, AudioFormat.LITTLE_ENDIAN,
+                                       AudioFormat.SIGNED));
+               }
+       }
+
+ private void parseVideoFormat(String rtpmap, HashMap<Integer, Format> formats) {
+               String tokens[] = rtpmap.toLowerCase().split(" ");
+
+               // split params
+               int p = Integer.parseInt(tokens[0]);
+               tokens = tokens[1].split("/");
+
+               String encodingName = tokens[0];
+               double clockRate = Double.parseDouble(tokens[1]);
+
+               formats.put(p, new VideoFormat(encodingName, 25, (int) 
clockRate));
+       }
+
+}

Reply via email to