Revision: 12091
Author: baranowb
Date: Mon Jun 7 02:25:02 2010
Log: some fixes.... on B2 more is a MUST
http://code.google.com/p/mobicents/source/detail?r=12091
Modified:
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java
Fri May 7 00:31:54 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MTP.java
Mon Jun 7 02:25:02 2010
@@ -105,6 +105,20 @@
}
/**
+ * @return the opc
+ */
+ public int getOpc() {
+ return opc;
+ }
+
+ /**
+ * @return the dpc
+ */
+ public int getDpc() {
+ return dpc;
+ }
+
+ /**
* Assigns signalling channels.
*
* @param channels the list of available physical channels.
@@ -124,7 +138,7 @@
try{
//create mtp layer 3 instance
mtp3 = new Mtp3(name);
- logger.info("Created MTP layer 3");
+ logger.info("Created MTP layer 3");
//assigning physical channel
mtp3.setChannels(channels);
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java
Sun Jun 6 13:13:16 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/Mtp3.java
Mon Jun 7 02:25:02 2010
@@ -102,7 +102,7 @@
private Logger logger = Logger.getLogger(Mtp3.class);
private ScheduledExecutorService mtpTimer = Utils.getMtpTimer();
-
+
private SelectorFactory selectorFactory;
protected LinkSelector selector;
@@ -118,8 +118,8 @@
}
public void setSelectorFactory(SelectorFactory selectorFactory) {
- this.selectorFactory = selectorFactory;
- selector = new LinkSelector(selectorFactory.getSelector());
+ this.selectorFactory = selectorFactory;
+ selector = new LinkSelector(selectorFactory.getSelector());
}
/**
@@ -127,7 +127,7 @@
*
* @param opc the originated point code
*/
- protected void setOpc(int opc) {
+ public void setOpc(int opc) {
this.opc = opc;
}
@@ -136,16 +136,30 @@
*
* @param dpc destination point code in decimal format.
*/
- protected void setDpc(int dpc) {
+ public void setDpc(int dpc) {
this.dpc = dpc;
}
/**
+ * @return the dpc
+ */
+ public int getDpc() {
+ return dpc;
+ }
+
+ /**
+ * @return the opc
+ */
+ public int getOpc() {
+ return opc;
+ }
+
+ /**
* Assigning channels.
*
* @param channels the list of signaling channels.
*/
- protected void setChannels(List<Mtp1> channels) {
+ public void setChannels(List<Mtp1> channels) {
//creating mtp layer 2 for each channel specified
for (Mtp1 channel : channels) {
Mtp2 link = new Mtp2(this, channel, channel.getCode());
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java
Thu May 13 07:58:18 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/mtp/MtpUser.java
Mon Jun 7 02:25:02 2010
@@ -16,10 +16,11 @@
public void linkDown();
/**
- *
+ * Callback from Layer4+. It expects properly encoded MTP3 message. It
forwards data to MTP3
* @param msgBuff
*/
public void receive(byte[] msgBuff);
+
public void receive(String msg);
public void setMtp3(Mtp3 mtp);
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java
Tue May 4 05:24:47 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPListener.java
Mon Jun 7 02:25:02 2010
@@ -1,9 +1,19 @@
package org.mobicents.protocols.ss7.stream;
public interface MTPListener {
-
+
+ /**
+ * Called once proper MSU has been streamed
+ * @param msg
+ */
public void receive(byte[] msg);
+ /**
+ * Indicates that status message - linkUp has been received
+ */
public void linkUp();
+ /**
+ * Indicates that either peer sent status message - linkDown or
connection has been lost
+ */
public void linkDown();
}
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java
Wed May 5 03:09:01 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/MTPProvider.java
Mon Jun 7 02:25:02 2010
@@ -2,13 +2,37 @@
import java.io.IOException;
-public interface MTPProvider {
-
+import org.mobicents.protocols.ss7.stream.tcp.StartFailedException;
+
+public interface MTPProvider {
+
+ /**
+ * Add MTPListener for callbacks
+ * @param lst
+ */
public void addMtpListener(MTPListener lst);
-
+ /**
+ * Remote listener
+ * @param lst
+ */
public void removeMtpListener(MTPListener lst);
-
+ /**
+ * Push data down the stream - it expects properly formed Mtp3 messaghe
+ * @param msg
+ * @throws IOException
+ */
public void send(byte[] msg) throws IOException;
+ /**
+ * Stop provider.
+ * @throws IllegalStateException
+ */
public void stop() throws IllegalStateException;
-}
+ /**
+ * Start provider
+ * @return
+ * @throws StartFailedException
+ * @throws IllegalStateException
+ */
+ public void start() throws StartFailedException,IllegalStateException;
+}
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java
Fri May 7 00:31:54 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/StreamForwarder.java
Mon Jun 7 02:25:02 2010
@@ -15,13 +15,34 @@
*/
public interface StreamForwarder {
+ /**
+ * Set port to listen for incoming connections
+ * @param port
+ */
public void setPort(int port);
public int getPort();
+ /**
+ * Set address to listen for incoming connections
+ * @param address
+ * @throws UnknownHostException
+ */
public void setAddress(String address) throws UnknownHostException;
public String getAddress();
+ /**
+ * Set layer3 - this is callback method for MTP class.
+ * @param layer3
+ */
public void setMtp3(Mtp3 layer3);
+
+ /**
+ * Start server
+ * @throws Exception
+ */
public void start() throws Exception;
+ /**
+ * Stop server
+ */
public void stop();
}
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java
Tue May 18 03:12:28 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserAgent.java
Mon Jun 7 02:25:02 2010
@@ -7,16 +7,13 @@
import java.net.Socket;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
-import java.util.Arrays;
import java.util.Iterator;
-import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -86,10 +83,10 @@
this.txBuff.limit(0);
}
- ///////////////////////
+ // /////////////////////
// Setters & Getters //
- ///////////////////////
-
+ // /////////////////////
+
public String getAddress() {
return this.address.toString();
}
@@ -111,8 +108,6 @@
}
}
-
-
public void setMtp3(Mtp3 mtp) {
this.mtp = mtp;
@@ -121,15 +116,14 @@
}
}
-
-
+
/**
* @return the connected
*/
public boolean isConnected() {
return connected;
}
-
+
public void start() throws IOException {
this.connectSelector =
SelectorProvider.provider().openSelector();
this.serverSocketChannel = ServerSocketChannel.open();
@@ -142,10 +136,9 @@
// Register the server socket channel, indicating an interest in
// accepting new connections
serverSocketChannel.register(this.connectSelector,
SelectionKey.OP_ACCEPT);
- if(logger.isInfoEnabled())
- {
- logger.info("Initiaited server on: " + this.address +
":" + this.port);
- }
+ if (logger.isInfoEnabled()) {
+ logger.info("Initiaited server on: " + this.address +
":" + this.port);
+ }
runnable = true;
this.runFuture = this.executor.submit(this);
@@ -178,18 +171,14 @@
disconnect();
}
- public void run()
- {
- while(runnable)
- {
- try
- {
+ public void run() {
+ while (runnable) {
+ try {
Iterator selectedKeys = null;
// Wait for an event one of the registered
channels
if (!connected) {
-
// block till we have someone
subscribing for data.
this.connectSelector.select();
@@ -197,10 +186,10 @@
// operate on keys set
performKeyOperations(selectedKeys);
- //} else if (linkUp) {
+ // } else if (linkUp) {
} else {
// else we try I/O ops.
-
+
if (this.readSelector.selectNow() > 0) {
selectedKeys =
this.readSelector.selectedKeys().iterator();
// operate on keys set
@@ -218,40 +207,38 @@
}
if (hdlcHandler.isTxBufferEmpty()) {
-// synchronized(this.writeSelector)
-// {
-//
this.writeSelector.wait(5);
-// }
-
+ //
synchronized(this.writeSelector)
+ // {
+ // this.writeSelector.wait(5);
+ // }
+
}
}
- }catch(ClosedSelectorException cse)
- {
+ } catch (ClosedSelectorException cse) {
cse.printStackTrace();
- //check for server selector?
+ // check for server selector?
disconnect();
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
-
- }
-
-
- }
-
+
+ }
+
+ }
+
private void performKeyOperations(Iterator<SelectionKey> selectedKeys)
throws IOException {
while (selectedKeys.hasNext()) {
SelectionKey key = selectedKeys.next();
- //THIS MUST BE PRESENT!
+ // THIS MUST BE PRESENT!
selectedKeys.remove();
if (!key.isValid()) {
// handle disconnect here?
- if(logger.isInfoEnabled()){
- logger.info("Key became invalid: "+key);
+ if (logger.isInfoEnabled()) {
+ logger.info("Key became invalid: " +
key);
}
continue;
}
@@ -270,96 +257,85 @@
}
}
-
+
private void accept(SelectionKey key) throws IOException {
- if(connected)
- {
- if(logger.isInfoEnabled())
- {
+ if (connected) {
+ if (logger.isInfoEnabled()) {
logger.info("Second client not supported yet.");
}
-
+
return;
}
ServerSocketChannel serverSocketChannel = (ServerSocketChannel)
key.channel();
-
-
+
this.channel = serverSocketChannel.accept();
this.writeSelector = SelectorProvider.provider().openSelector();
this.readSelector = SelectorProvider.provider().openSelector();
Socket socket = channel.socket();
-
+
this.channel.configureBlocking(false);
this.channel.register(this.readSelector, SelectionKey.OP_READ);
this.channel.register(this.writeSelector,
SelectionKey.OP_WRITE);
-
this.connected = true;
-
+
if (logger.isInfoEnabled()) {
logger.info("Estabilished connection with: " + socket.getInetAddress()
+ ":" + socket.getPort());
-
- }
-
- //if (connected) {
- // serverSocketChannel.close();
- // return;
- //}
- //lets strean state
- if(linkUp)
- {
+
+ }
+
+ // if (connected) {
+ // serverSocketChannel.close();
+ // return;
+ // }
+ // lets strean state
+ if (linkUp) {
this.streamData(_LINK_STATE_UP);
- }else
- {
- //this.streamData(_LINK_STATE_DOWN);
+ } else {
+ // this.streamData(_LINK_STATE_DOWN);
}
}
-
+
private void write(SelectionKey key) throws IOException {
-
+
SocketChannel socketChannel = (SocketChannel) key.channel();
// Write until there's not more data ?
-
- //while (!txBuffer.isEmpty()) {
- if(txBuff.remaining()>0)
- {
- int sentDataCount = socketChannel.write(txBuff);
-
- if(txBuff.remaining()>0)
- {
- //buffer filled.
+
+ // while (!txBuffer.isEmpty()) {
+ if (txBuff.remaining() > 0) {
+ int sentDataCount = socketChannel.write(txBuff);
+
+ if (txBuff.remaining() > 0) {
+ // buffer filled.
return;
- }else
- {
-
+ } else {
+
}
}
- //while (!this.hdlcHandler.isTxBufferEmpty()) {
+ // while (!this.hdlcHandler.isTxBufferEmpty()) {
if (!this.hdlcHandler.isTxBufferEmpty()) {
- //ByteBuffer buf = (ByteBuffer) txBuffer.get(0);
+ // ByteBuffer buf = (ByteBuffer) txBuffer.get(0);
txBuff.clear();
this.hdlcHandler.processTx(txBuff);
txBuff.flip();
socketChannel.write(txBuff);
- //if (buf.remaining() > 0) {
- if(txBuff.remaining()>0)
- {
+ // if (buf.remaining() > 0) {
+ if (txBuff.remaining() > 0) {
// ... or the socket's buffer fills up
return;
}
- //buf.clear();
- //txBuff.clear();
- //txBuffer.remove(0);
-
+ // buf.clear();
+ // txBuff.clear();
+ // txBuffer.remove(0);
}
}
-
+
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
@@ -385,64 +361,57 @@
handleClose(key);
return;
}
- //pass it on.
+ // pass it on.
ByteBuffer[] readResult = null;
- //This will read everything, and if there is incomplete frame, it will
retain its partial content
- //so on next read it can continue to decode!
+ // This will read everything, and if there is incomplete frame,
it will
+ // retain its partial content
+ // so on next read it can continue to decode!
this.readBuff.flip();
-
- while((readResult =
this.hdlcHandler.processRx(this.readBuff))!=null)
- {
- for(ByteBuffer b:readResult)
- {
-
- //byte sls = b.get();
- //byte linksetId = b.get();
- //this.layer3.send(sls,linksetId,si, ssf,
b.array());
+
+ while ((readResult = this.hdlcHandler.processRx(this.readBuff)) != null)
{
+ for (ByteBuffer b : readResult) {
+
+ // byte sls = b.get();
+ // byte linksetId = b.get();
+ // this.layer3.send(sls,linksetId,si, ssf,
b.array());
TLVInputStream tlvInputStream = new TLVInputStream(new
ByteArrayInputStream(b.array()));
int tag = tlvInputStream.readTag();
- if(tag == Tag._TAG_LINK_DATA)
- {
+ if (tag == Tag._TAG_LINK_DATA) {
byte[] data =
tlvInputStream.readLinkData();
-
- this.mtp.send( data);
- }else if (tag == Tag._TAG_LINK_STATUS)
- {
+
+ this.mtp.send(data);
+ } else if (tag == Tag._TAG_LINK_STATUS) {
LinkStatus ls =
tlvInputStream.readLinkStatus();
- switch(ls)
- {
- case Query:
- if(this.linkUp)
- {
-
this.streamData(_LINK_STATE_UP);
- }else
- {
-
this.streamData(_LINK_STATE_DOWN);
- }
-
- }
- }else
- {
+ switch (ls) {
+ case Query:
+ if (this.linkUp) {
+
this.streamData(_LINK_STATE_UP);
+ } else {
+
this.streamData(_LINK_STATE_DOWN);
+ }
+
+ }
+ } else {
logger.warn("Received weird message!");
}
}
}
this.readBuff.clear();
- //this.layer3.send(si, ssf, this.readBuff.array());
+ // this.layer3.send(si, ssf, this.readBuff.array());
}
+
private void handleClose(SelectionKey key) {
- if(logger.isDebugEnabled())
- {
- logger.debug("Handling key close operations: " + key);
+ if (logger.isDebugEnabled()) {
+ logger.debug("Handling key close operations: " + key);
}
linkDown();
try {
disconnect();
} finally {
// linkDown();
- //connected = false;
+ // connected = false;
// synchronized (this.hdlcHandler) {
synchronized (this.writeSelector) {
// this is to ensure buffer does not have any
bad data.
@@ -453,9 +422,9 @@
}
return;
}
-
+
private void disconnect() {
-
+
if (this.channel != null) {
try {
this.channel.close();
@@ -485,16 +454,17 @@
}
this.connected = false;
-
- }
+ }
+
/**
- * Method used internaly - this one sends actuall data to remote end.
+ * Method used internally - this one sends actual data to remote end.
+ *
* @param data
*/
public void streamData(byte[] data) {
- //if(this.channel!=null)
- // connected = this.channel.isConnected();
-
+ // if(this.channel!=null)
+ // connected = this.channel.isConnected();
+
if (!connected) {
if (logger.isInfoEnabled()) {
logger.info("There is no client interested in data stream, ignoring.
Message should be retransmited.");
@@ -504,11 +474,11 @@
}
// And queue the data we want written
- //synchronized (this.txBuffer) {
- //synchronized (this.hdlcHandler) {
+ // synchronized (this.txBuffer) {
+ // synchronized (this.hdlcHandler) {
synchronized (this.writeSelector) {
- //this.txBuffer.add(ByteBuffer.wrap(data));
+ // this.txBuffer.add(ByteBuffer.wrap(data));
ByteBuffer bb = ByteBuffer.allocate(data.length);
bb.put(data);
bb.flip();
@@ -517,21 +487,21 @@
// changes
this.writeSelector.wakeup();
}
-
-
- }
- ////////////
+
+ }
+
+ // //////////
// LAYER4 //
- ////////////
+ // //////////
public void linkDown() {
if (logger.isInfoEnabled()) {
logger.info("Received L4 Down event from layer3.");
}
this.linkUp = false;
- //FIXME: proper actions here.
- //this.txBuff.clear();
- //this.txBuff.limit(0);
- //this.readBuff.clear();
+ // FIXME: proper actions here.
+ // this.txBuff.clear();
+ // this.txBuff.limit(0);
+ // this.readBuff.clear();
this.streamData(_LINK_STATE_DOWN);
}
@@ -542,30 +512,28 @@
this.linkUp = true;
this.streamData(_LINK_STATE_UP);
}
- public void receive(String msg)
- {
+
+ public void receive(String msg) {
this.receive(msg.getBytes());
-
- }
-
- public void receive( byte[] msgBuff) {
-
+
+ }
+
+ public void receive(byte[] msgBuff) {
// layer3 has something important, lets write.
- //if(linkUp)
- //{
- TLVOutputStream tlv = new TLVOutputStream();
- try {
- tlv.writeData(msgBuff);
- byte[] data = tlv.toByteArray();
- this.streamData(data);
- } catch (IOException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
- //}
+ // if(linkUp)
+ // {
+ TLVOutputStream tlv = new TLVOutputStream();
+ try {
+ tlv.writeData(msgBuff);
+ byte[] data = tlv.toByteArray();
+ this.streamData(data);
+ } catch (IOException e) {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+
+ // }
}
-
-}
+}
=======================================
---
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java
Tue May 18 03:12:28 2010
+++
/trunk/protocols/ss7/mtp/src/main/java/org/mobicents/protocols/ss7/stream/tcp/M3UserConnector.java
Mon Jun 7 02:25:02 2010
@@ -7,14 +7,11 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
-import java.nio.channels.Channel;
-import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -23,11 +20,9 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
-import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.mobicents.protocols.ss7.stream.HDLCHandler;
import org.mobicents.protocols.ss7.stream.MTPListener;
-
import org.mobicents.protocols.ss7.stream.tlv.LinkStatus;
import org.mobicents.protocols.ss7.stream.tlv.TLVInputStream;
import org.mobicents.protocols.ss7.stream.tlv.TLVOutputStream;
@@ -73,7 +68,8 @@
public M3UserConnector(Properties properties) {
this();
- this.properties.putAll(properties);
+ if(properties!=null)
+ this.properties.putAll(properties);
}
public M3UserConnector() {
super();
@@ -124,6 +120,30 @@
}
+ /**
+ * @return the serverAddress
+ */
+ public String getServerAddress() {
+ return serverAddress;
+ }
+ /**
+ * @param serverAddress the serverAddress to set
+ */
+ public void setServerAddress(String serverAddress) {
+ this.serverAddress = serverAddress;
+ }
+ /**
+ * @return the serverPort
+ */
+ public int getServerPort() {
+ return serverPort;
+ }
+ /**
+ * @param serverPort the serverPort to set
+ */
+ public void setServerPort(int serverPort) {
+ this.serverPort = serverPort;
+ }
private void readProperties() {
serverPort = Integer.parseInt(properties.getProperty(_PROPERTY_PORT, ""
+ serverPort));
serverAddress = properties.getProperty(_PROPERTY_IP, "" +
serverAddress);