Revision: 12091
Author: baranowb
Date: Mon Jun  7 02:25:02 2010
Log: some fixes.... on B2 more is a MUST
http://code.google.com/p/mobicents/source/detail?r=12091

Modified:
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java

=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java Fri May 7 00:31:54 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java Mon Jun 7 02:25:02 2010
@@ -105,6 +105,20 @@
     }

     /**
+        * @return the opc
+        */
+       public int getOpc() {
+               return opc;
+       }
+
+       /**
+        * @return the dpc
+        */
+       public int getDpc() {
+               return dpc;
+       }
+
+       /**
      * Assigns signalling channels.
      *
      * @param channels the list of available physical channels.
@@ -124,7 +138,7 @@
        try{
         //create mtp layer 3 instance
         mtp3 = new Mtp3(name);
-       logger.info("Created MTP layer 3");
+        logger.info("Created MTP layer 3");

         //assigning physical channel
         mtp3.setChannels(channels);
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java Sun Jun 6 13:13:16 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java Mon Jun 7 02:25:02 2010
@@ -102,7 +102,7 @@
     private Logger logger = Logger.getLogger(Mtp3.class);

     private ScheduledExecutorService mtpTimer = Utils.getMtpTimer();
-
+
     private SelectorFactory selectorFactory;
     protected LinkSelector selector;

@@ -118,8 +118,8 @@
     }

     public void setSelectorFactory(SelectorFactory selectorFactory) {
-       this.selectorFactory = selectorFactory;
-       selector = new LinkSelector(selectorFactory.getSelector());
+       this.selectorFactory = selectorFactory;
+       selector = new LinkSelector(selectorFactory.getSelector());
     }

     /**
@@ -127,7 +127,7 @@
      *
      * @param opc the originated point code
      */
-    protected void setOpc(int opc) {
+    public void setOpc(int opc) {
         this.opc = opc;
     }

@@ -136,16 +136,30 @@
      *
      * @param dpc destination point code in decimal format.
      */
-    protected void setDpc(int dpc) {
+    public void setDpc(int dpc) {
         this.dpc = dpc;
     }

     /**
+        * @return the dpc
+        */
+       public int getDpc() {
+               return dpc;
+       }
+
+       /**
+        * @return the opc
+        */
+       public int getOpc() {
+               return opc;
+       }
+
+       /**
      * Assigning channels.
      *
      * @param channels the list of signaling channels.
      */
-    protected void setChannels(List<Mtp1> channels) {
+    public void setChannels(List<Mtp1> channels) {
         //creating mtp layer 2 for each channel specified
         for (Mtp1 channel : channels) {
            Mtp2 link = new Mtp2(this, channel, channel.getCode());
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java Thu May 13 07:58:18 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java Mon Jun 7 02:25:02 2010
@@ -16,10 +16,11 @@
        public void linkDown();

        /**
-        *
+ * Callback from Layer4+. It expects properly encoded MTP3 message. It forwards data to MTP3
         * @param msgBuff
         */
        public void receive(byte[] msgBuff);
+
        public void receive(String msg);

        public void setMtp3(Mtp3 mtp);
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java Tue May 4 05:24:47 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java Mon Jun 7 02:25:02 2010
@@ -1,9 +1,19 @@
 package org.mobicents.protocols.ss7.stream;

 public interface MTPListener {
-
+
+       /**
+        * Called once proper MSU has been streamed
+        * @param msg
+        */
        public void receive(byte[] msg);
+       /**
+        * Indicates that status message - linkUp has been received
+        */
        public void linkUp();
+       /**
+ * Indicates that either peer sent status message - linkDown or connection has been lost
+        */
        public void linkDown();

 }
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java Wed May 5 03:09:01 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java Mon Jun 7 02:25:02 2010
@@ -2,13 +2,37 @@

 import java.io.IOException;

-public interface MTPProvider {
-
+import org.mobicents.protocols.ss7.stream.tcp.StartFailedException;
+
+public interface MTPProvider {
+
+       /**
+        * Add MTPListener for callbacks
+        * @param lst
+        */
        public void addMtpListener(MTPListener lst);
-
+       /**
+        * Remote listener
+        * @param lst
+        */
        public void removeMtpListener(MTPListener lst);
-
+       /**
+        * Push data down the stream - it expects properly formed Mtp3 messaghe
+        * @param msg
+        * @throws IOException
+        */
        public void send(byte[] msg) throws IOException;

+       /**
+        * Stop provider.
+        * @throws IllegalStateException
+        */
        public void stop() throws IllegalStateException;
-}
+       /**
+        * Start provider
+        * @return
+        * @throws StartFailedException
+        * @throws IllegalStateException
+        */
+       public void start() throws StartFailedException,IllegalStateException;
+}
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java Fri May 7 00:31:54 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java Mon Jun 7 02:25:02 2010
@@ -15,13 +15,34 @@
  */
 public interface StreamForwarder {

+       /**
+        * Set port to listen for incoming connections
+        * @param port
+        */
        public void setPort(int port);
        public int getPort();
+       /**
+        * Set address to listen for incoming connections
+        * @param address
+        * @throws UnknownHostException
+        */
        public void setAddress(String address) throws UnknownHostException;
        public String getAddress();

+       /**
+        * Set layer3 - this is callback method for MTP class.
+        * @param layer3
+        */
        public void setMtp3(Mtp3 layer3);
+
+       /**
+        * Start server
+        * @throws Exception
+        */
        public void start() throws Exception;
+       /**
+        * Stop server
+        */
        public void stop();

 }
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java Tue May 18 03:12:28 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java Mon Jun 7 02:25:02 2010
@@ -7,16 +7,13 @@
 import java.net.Socket;
 import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.ClosedSelectorException;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.ServerSocketChannel;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
-import java.util.Arrays;
 import java.util.Iterator;
-import java.util.Set;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -86,10 +83,10 @@
                this.txBuff.limit(0);
        }

-       ///////////////////////
+       // /////////////////////
        // Setters & Getters //
-       ///////////////////////
-
+       // /////////////////////
+
        public String getAddress() {
                return this.address.toString();
        }
@@ -111,8 +108,6 @@
                }

        }
-
-

        public void setMtp3(Mtp3 mtp) {
                this.mtp = mtp;
@@ -121,15 +116,14 @@
                }

        }
-
-
+
        /**
         * @return the connected
         */
        public boolean isConnected() {
                return connected;
        }
-
+
        public void start() throws IOException {
                this.connectSelector = 
SelectorProvider.provider().openSelector();
                this.serverSocketChannel = ServerSocketChannel.open();
@@ -142,10 +136,9 @@
                // Register the server socket channel, indicating an interest in
                // accepting new connections
serverSocketChannel.register(this.connectSelector, SelectionKey.OP_ACCEPT);
-               if(logger.isInfoEnabled())
-                       {
-                               logger.info("Initiaited server on: " + this.address + 
":" + this.port);
-                       }
+               if (logger.isInfoEnabled()) {
+                       logger.info("Initiaited server on: " + this.address + 
":" + this.port);
+               }

                runnable = true;
                this.runFuture = this.executor.submit(this);
@@ -178,18 +171,14 @@
                disconnect();
        }

-       public void run()
-       {
-               while(runnable)
-               {
-                       try
-                       {
+       public void run() {
+               while (runnable) {
+                       try {
                                Iterator selectedKeys = null;

                                // Wait for an event one of the registered 
channels
                                if (!connected) {

-
                                        // block till we have someone 
subscribing for data.
                                        this.connectSelector.select();

@@ -197,10 +186,10 @@
                                        // operate on keys set
                                        performKeyOperations(selectedKeys);

-                               //} else if (linkUp) {
+                                       // } else if (linkUp) {
                                } else {
                                        // else we try I/O ops.
-
+
                                        if (this.readSelector.selectNow() > 0) {
                                                selectedKeys = 
this.readSelector.selectedKeys().iterator();
                                                // operate on keys set
@@ -218,40 +207,38 @@
                                        }

                                        if (hdlcHandler.isTxBufferEmpty()) {
-//                                             synchronized(this.writeSelector)
-//                                             {
-//                                                     
this.writeSelector.wait(5);
-//                                             }
-
+                                               // 
synchronized(this.writeSelector)
+                                               // {
+                                               // this.writeSelector.wait(5);
+                                               // }
+
                                        }
                                }
-                       }catch(ClosedSelectorException cse)
-                       {
+                       } catch (ClosedSelectorException cse) {
                                cse.printStackTrace();
-                               //check for server selector?
+                               // check for server selector?
                                disconnect();
                        } catch (Exception e) {
                                // TODO Auto-generated catch block
                                e.printStackTrace();
                        }
-
-               }
-
-
-       }
-
+
+               }
+
+       }
+
private void performKeyOperations(Iterator<SelectionKey> selectedKeys) throws IOException {

                while (selectedKeys.hasNext()) {

                        SelectionKey key = selectedKeys.next();
-                       //THIS MUST BE PRESENT!
+                       // THIS MUST BE PRESENT!
                        selectedKeys.remove();

                        if (!key.isValid()) {
                                // handle disconnect here?
-                               if(logger.isInfoEnabled()){
-                                       logger.info("Key became invalid: "+key);
+                               if (logger.isInfoEnabled()) {
+                                       logger.info("Key became invalid: " + 
key);
                                }
                                continue;
                        }
@@ -270,96 +257,85 @@
                }

        }
-
+
        private void accept(SelectionKey key) throws IOException {
-               if(connected)
-               {
-                       if(logger.isInfoEnabled())
-                       {
+               if (connected) {
+                       if (logger.isInfoEnabled()) {
                                logger.info("Second client not supported yet.");
                        }
-
+
                        return;
                }
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
-
-
+
                this.channel = serverSocketChannel.accept();
                this.writeSelector = SelectorProvider.provider().openSelector();
                this.readSelector = SelectorProvider.provider().openSelector();
                Socket socket = channel.socket();
-
+
                this.channel.configureBlocking(false);
                this.channel.register(this.readSelector, SelectionKey.OP_READ);
                this.channel.register(this.writeSelector, 
SelectionKey.OP_WRITE);
-

                this.connected = true;
-
+
                if (logger.isInfoEnabled()) {
logger.info("Estabilished connection with: " + socket.getInetAddress() + ":" + socket.getPort());
-
-               }
-
-               //if (connected) {
-               //      serverSocketChannel.close();
-               //      return;
-               //}
-               //lets strean state
-               if(linkUp)
-               {
+
+               }
+
+               // if (connected) {
+               // serverSocketChannel.close();
+               // return;
+               // }
+               // lets strean state
+               if (linkUp) {
                        this.streamData(_LINK_STATE_UP);
-               }else
-               {
-                       //this.streamData(_LINK_STATE_DOWN);
+               } else {
+                       // this.streamData(_LINK_STATE_DOWN);
                }
        }
-
+
        private void write(SelectionKey key) throws IOException {
-
+
                SocketChannel socketChannel = (SocketChannel) key.channel();

                // Write until there's not more data ?
-
-               //while (!txBuffer.isEmpty()) {
-               if(txBuff.remaining()>0)
-               {
-                       int  sentDataCount = socketChannel.write(txBuff);
-
-                       if(txBuff.remaining()>0)
-                       {
-                               //buffer filled.
+
+               // while (!txBuffer.isEmpty()) {
+               if (txBuff.remaining() > 0) {
+                       int sentDataCount = socketChannel.write(txBuff);
+
+                       if (txBuff.remaining() > 0) {
+                               // buffer filled.
                                return;
-                       }else
-                       {
-
+                       } else {
+
                        }
                }
-               //while (!this.hdlcHandler.isTxBufferEmpty()) {
+               // while (!this.hdlcHandler.isTxBufferEmpty()) {
                if (!this.hdlcHandler.isTxBufferEmpty()) {

-                       //ByteBuffer buf = (ByteBuffer) txBuffer.get(0);
+                       // ByteBuffer buf = (ByteBuffer) txBuffer.get(0);
                        txBuff.clear();

                        this.hdlcHandler.processTx(txBuff);
                        txBuff.flip();
                        socketChannel.write(txBuff);

-                       //if (buf.remaining() > 0) {
-                       if(txBuff.remaining()>0)
-                       {
+                       // if (buf.remaining() > 0) {
+                       if (txBuff.remaining() > 0) {
                                // ... or the socket's buffer fills up
                                return;
                        }
-                       //buf.clear();
-                       //txBuff.clear();
-                       //txBuffer.remove(0);
-
+                       // buf.clear();
+                       // txBuff.clear();
+                       // txBuffer.remove(0);

                }

        }
-
+
        private void read(SelectionKey key) throws IOException {
                SocketChannel socketChannel = (SocketChannel) key.channel();

@@ -385,64 +361,57 @@
                        handleClose(key);
                        return;
                }
-               //pass it on.
+               // pass it on.
                ByteBuffer[] readResult = null;

- //This will read everything, and if there is incomplete frame, it will retain its partial content
-               //so on next read it can continue to decode!
+               // This will read everything, and if there is incomplete frame, 
it will
+               // retain its partial content
+               // so on next read it can continue to decode!
                this.readBuff.flip();
-
-               while((readResult = 
this.hdlcHandler.processRx(this.readBuff))!=null)
-               {
-                       for(ByteBuffer b:readResult)
-                       {
-
-                               //byte sls = b.get();
-                               //byte linksetId = b.get();
-                               //this.layer3.send(sls,linksetId,si, ssf, 
b.array());
+
+ while ((readResult = this.hdlcHandler.processRx(this.readBuff)) != null) {
+                       for (ByteBuffer b : readResult) {
+
+                               // byte sls = b.get();
+                               // byte linksetId = b.get();
+                               // this.layer3.send(sls,linksetId,si, ssf, 
b.array());
TLVInputStream tlvInputStream = new TLVInputStream(new ByteArrayInputStream(b.array()));
                                int tag = tlvInputStream.readTag();
-                               if(tag == Tag._TAG_LINK_DATA)
-                               {
+                               if (tag == Tag._TAG_LINK_DATA) {
                                        byte[] data = 
tlvInputStream.readLinkData();
-
-                                       this.mtp.send( data);
-                               }else if (tag == Tag._TAG_LINK_STATUS)
-                               {
+
+                                       this.mtp.send(data);
+                               } else if (tag == Tag._TAG_LINK_STATUS) {
                                        LinkStatus ls = 
tlvInputStream.readLinkStatus();
-                                       switch(ls)
-                                       {
-                                               case Query:
-                                                       if(this.linkUp)
-                                                       {
-                                                               
this.streamData(_LINK_STATE_UP);
-                                                       }else
-                                                       {
-                                                               
this.streamData(_LINK_STATE_DOWN);
-                                                       }
-
-                                       }
-                               }else
-                               {
+                                       switch (ls) {
+                                       case Query:
+                                               if (this.linkUp) {
+                                                       
this.streamData(_LINK_STATE_UP);
+                                               } else {
+                                                       
this.streamData(_LINK_STATE_DOWN);
+                                               }
+
+                                       }
+                               } else {
                                        logger.warn("Received weird message!");
                                }
                        }
                }
                this.readBuff.clear();
-               //this.layer3.send(si, ssf, this.readBuff.array());
+               // this.layer3.send(si, ssf, this.readBuff.array());

        }
+
        private void handleClose(SelectionKey key) {
-               if(logger.isDebugEnabled())
-        {
-               logger.debug("Handling key close operations: " + key);
+               if (logger.isDebugEnabled()) {
+                       logger.debug("Handling key close operations: " + key);
                }
                linkDown();
                try {
                        disconnect();
                } finally {
                        // linkDown();
-                       //connected = false;
+                       // connected = false;
                        // synchronized (this.hdlcHandler) {
                        synchronized (this.writeSelector) {
                                // this is to ensure buffer does not have any 
bad data.
@@ -453,9 +422,9 @@
                }
                return;
        }
-
+
        private void disconnect() {
-
+
                if (this.channel != null) {
                        try {
                                this.channel.close();
@@ -485,16 +454,17 @@
                }
                this.connected = false;

-
-       }
+       }
+
        /**
-        * Method used internaly - this one sends actuall data to remote end.
+        * Method used internally - this one sends actual data to remote end.
+        *
         * @param data
         */
        public void streamData(byte[] data) {
-               //if(this.channel!=null)
-               //      connected = this.channel.isConnected();
-
+               // if(this.channel!=null)
+               // connected = this.channel.isConnected();
+
                if (!connected) {
                        if (logger.isInfoEnabled()) {
logger.info("There is no client interested in data stream, ignoring. Message should be retransmited.");
@@ -504,11 +474,11 @@
                }

                // And queue the data we want written
-               //synchronized (this.txBuffer) {
-               //synchronized (this.hdlcHandler) {
+               // synchronized (this.txBuffer) {
+               // synchronized (this.hdlcHandler) {
                synchronized (this.writeSelector) {

-                       //this.txBuffer.add(ByteBuffer.wrap(data));
+                       // this.txBuffer.add(ByteBuffer.wrap(data));
                        ByteBuffer bb = ByteBuffer.allocate(data.length);
                        bb.put(data);
                        bb.flip();
@@ -517,21 +487,21 @@
                        // changes
                        this.writeSelector.wakeup();
                }
-
-
-       }
-       ////////////
+
+       }
+
+       // //////////
        // LAYER4 //
-       ////////////
+       // //////////
        public void linkDown() {
                if (logger.isInfoEnabled()) {
                        logger.info("Received L4 Down event from layer3.");
                }
                this.linkUp = false;
-               //FIXME: proper actions here.
-               //this.txBuff.clear();
-               //this.txBuff.limit(0);
-               //this.readBuff.clear();
+               // FIXME: proper actions here.
+               // this.txBuff.clear();
+               // this.txBuff.limit(0);
+               // this.readBuff.clear();
                this.streamData(_LINK_STATE_DOWN);
        }

@@ -542,30 +512,28 @@
                this.linkUp = true;
                this.streamData(_LINK_STATE_UP);
        }
-       public void receive(String msg)
-       {
+
+       public void receive(String msg) {
                this.receive(msg.getBytes());
-
-       }
-
-       public void receive( byte[] msgBuff) {
-
+
+       }
+
+       public void receive(byte[] msgBuff) {

                // layer3 has something important, lets write.
-               //if(linkUp)
-               //{
-                       TLVOutputStream tlv = new TLVOutputStream();
-                       try {
-                               tlv.writeData(msgBuff);
-                               byte[] data = tlv.toByteArray();
-                               this.streamData(data);
-                       } catch (IOException e) {
-                               // TODO Auto-generated catch block
-                               e.printStackTrace();
-                       }
-
-               //}
+               // if(linkUp)
+               // {
+               TLVOutputStream tlv = new TLVOutputStream();
+               try {
+                       tlv.writeData(msgBuff);
+                       byte[] data = tlv.toByteArray();
+                       this.streamData(data);
+               } catch (IOException e) {
+                       // TODO Auto-generated catch block
+                       e.printStackTrace();
+               }
+
+               // }
        }

-
-}
+}
=======================================
--- /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java Tue May 18 03:12:28 2010 +++ /trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java Mon Jun 7 02:25:02 2010
@@ -7,14 +7,11 @@
 import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.SelectableChannel;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.Selector;
 import java.nio.channels.SocketChannel;
 import java.nio.channels.spi.SelectorProvider;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
@@ -23,11 +20,9 @@
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;

-import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.mobicents.protocols.ss7.stream.HDLCHandler;
 import org.mobicents.protocols.ss7.stream.MTPListener;
-
 import org.mobicents.protocols.ss7.stream.tlv.LinkStatus;
 import org.mobicents.protocols.ss7.stream.tlv.TLVInputStream;
 import org.mobicents.protocols.ss7.stream.tlv.TLVOutputStream;
@@ -73,7 +68,8 @@

        public M3UserConnector(Properties properties) {
                this();
-               this.properties.putAll(properties);
+               if(properties!=null)
+                       this.properties.putAll(properties);
        }
        public M3UserConnector() {
                super();
@@ -124,6 +120,30 @@

        }

+       /**
+        * @return the serverAddress
+        */
+       public String getServerAddress() {
+               return serverAddress;
+       }
+       /**
+        * @param serverAddress the serverAddress to set
+        */
+       public void setServerAddress(String serverAddress) {
+               this.serverAddress = serverAddress;
+       }
+       /**
+        * @return the serverPort
+        */
+       public int getServerPort() {
+               return serverPort;
+       }
+       /**
+        * @param serverPort the serverPort to set
+        */
+       public void setServerPort(int serverPort) {
+               this.serverPort = serverPort;
+       }
        private void readProperties() {
serverPort = Integer.parseInt(properties.getProperty(_PROPERTY_PORT, "" + serverPort));
                serverAddress = properties.getProperty(_PROPERTY_IP, "" + 
serverAddress);

Reply via email to