Dear mina list members,
I try to create a new ProtoDecoder with MINA version 2.0.0-M6 for a
binary protocol which comes over the Serial transporter.
Every time when I get message from the serial input my handler is called
6 times with the same input.
Attached my code, if anybody want to take a look what I have done;-).
It would be nice when somebody can help me to find the error.
I'am sure I have missunderstand the interaction between the decoder and
the handler ;-(
Due to the fact that I'am new with mina maybe there is also a better way
to save the input then to add a IoBuffer in MyMessage.
The log output writes the following:
10925 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
10944 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
10947 [Thread-3] DEBUG MyHandler - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
10947 [Thread-3] DEBUG MyHandler - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
10947 [Thread-3] DEBUG MyHandler - session readed_bytes:18
11412 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
11433 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
11436 [Thread-3] DEBUG MyHandler - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
11436 [Thread-3] DEBUG MyHandler - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
11436 [Thread-3] DEBUG MyHandler - session readed_bytes:18
11905 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
11926 [Thread-3] INFO LoggingFilter - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
11929 [Thread-3] DEBUG MyHandler - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
11929 [Thread-3] DEBUG MyHandler - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
11929 [Thread-3] DEBUG MyHandler - session readed_bytes:18
next sequenze ...
###
Many many thanks
BR
Aleks
package org.ala;
import java.net.InetSocketAddress;
import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.serial.SerialAddress;
import org.apache.mina.transport.serial.SerialConnector;
import org.apache.mina.transport.serial.SerialAddress.DataBits;
import org.apache.mina.transport.serial.SerialAddress.FlowControl;
import org.apache.mina.transport.serial.SerialAddress.Parity;
import org.apache.mina.transport.serial.SerialAddress.StopBits;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
public class Main {
private static final int PORT = 8080;
/**
* @param args
*/
public static void main(String[] args) throws Exception {
// TODO Auto-generated method stub
ProtocolCodecFilter pcf = new ProtocolCodecFilter(new
TextLineEncoder(),new ProtoDecoder());
/* */
IoConnector connector = new SerialConnector();
connector.getFilterChain().addFirst("logging", new
LoggingFilter());
connector.getFilterChain().addLast("codec", pcf);
connector.setConnectTimeoutMillis(30);
connector.setHandler(new MyHandler());
SerialAddress portAddress = new SerialAddress("/dev/ttyS0",
19200, DataBits.DATABITS_7, StopBits.BITS_1, Parity.EVEN, FlowControl.NONE );
try {
ConnectFuture future = connector.connect(portAddress);
future.await();
System.out.println("fu-isConn " +
future.isConnected());
if(future.isConnected()){
IoSession sessin = future.getSession();
System.out.println("fu Str " +
future.toString());
System.out.println("sess " + sessin);
//sessin.close(false);
//connector.dispose();
}else{
future.cancel();
connector.dispose();
System.exit(0);
}
//sessin.close(true);
}catch(Exception e){
e.printStackTrace();
connector.dispose();
System.exit(0);
}
//System.exit(0);
/* * /
SocketAcceptor accpetor = new NioSocketAcceptor();
accpetor.setReuseAddress( true );
accpetor.getFilterChain().addFirst("logging", new
LoggingFilter());
accpetor.getFilterChain().addLast("codec",pcf);
accpetor.setHandler(new MyHandler());
accpetor.bind(new InetSocketAddress(PORT));
System.out.println("Listeningon port " + PORT);
/ * */
}
}
package org.ala;
import org.ala.MyMessage;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyHandler extends IoHandlerAdapter {
private final Logger log;
private MyMessage cm;
public MyHandler() {
// TODO Auto-generated constructor stub
log = LoggerFactory.getLogger(MyHandler.class);
log.debug("Created MyHandler");
}
public void messageReceived(IoSession session, Object message) throws
Exception {
if(session.containsAttribute("cm")){
cm = (MyMessage) session.getAttribute("cm");
log.debug("have MyMessage " + cm );
}else{
cm = new MyMessage(session);
session.setAttribute("cm", cm);
}
if(cm.getInMsg()){
log.debug("still in body");
}else{
log.debug("cm tb " + cm.getTempBuffer());
log.debug("Received : " + message);
log.debug(" session readed_bytes:" + cm.getReaded_bytes());
if(session.containsAttribute("cm")){
cm = (MyMessage) session.getAttribute("cm");
session.removeAttribute("cm");
cm = null;
log.debug("in " + this.getClass().getName() +
".messageReceived removed MyMessage ");
}
}
}
public void sessionOpened(IoSession session) throws Exception{
log.debug("sessionOpened " + session);
}
public void sessionCreated(IoSession session) throws Exception{
session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30); //
sec
log.debug("sessionCreated " + session);
}
public void sessionClosed(IoSession session) throws Exception {
log.debug("sessionClosed " + session);
}
public void exceptionCaught(IoSession session, Throwable cause) throws
Exception{
log.debug(" session " + session + " exceptionCaught:");
log.error("exceptionCaught ", cause);
//cause.printStackTrace();
if(session.containsAttribute("cm")){
cm = (MyMessage) session.getAttribute("cm");
session.removeAttribute("cm");
cm = null;
log.debug("in " + this.getClass().getName() +
".exceptionCaught removed MyMessage ");
}
session.close(true);
}
@Override
public void sessionIdle(IoSession session, IdleStatus status) throws
Exception{
//System.out.println("sessionIdle status " +
session.getIdleCount( status));
log.debug("sessionIdle status " + session.getIdleCount(
status));
if(session.getIdleCount( status ) == 3){
if(session.containsAttribute("cm")){
cm = (MyMessage) session.getAttribute("cm");
session.removeAttribute("cm");
cm = null;
log.debug("in " + this.getClass().getName() +
".sessionIdle removed MyMessage ");
}
session.close(true);
}
}
}
package org.ala;
import java.nio.charset.CharacterCodingException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MyMessage {
private String RRField;
private String PPField;
private String TTField;
private String P1Field;
private String P2Field;
private String DNFields;
private String MTField;
private Boolean inMsg;
private IoSession MyIoSess;
private IoBuffer tempBuffer;
private Integer readed_bytes;
private final Logger log;
public MyMessage(IoSession session){
setMyIoSess(session);
setInMsg(false);
setReaded_bytes(0);
tempBuffer = IoBuffer.allocate(256).setAutoExpand(true);
log = LoggerFactory.getLogger(MyMessage.class);
log.debug("Created MyMessage");
}
/**
* @param rRField the rRField to set
*/
public void setRRField(String rRField) {
RRField = rRField;
}
/**
* @return the rRField
*/
public String getRRField() {
return RRField;
}
/**
* @param tTField the tTField to set
*/
public void setTTField(String tTField) {
TTField = tTField;
}
/**
* @return the tTField
*/
public String getTTField() {
return TTField;
}
/**
* @param pPField the pPField to set
*/
public void setPPField(String pPField) {
PPField = pPField;
}
/**
* @return the pPField
*/
public String getPPField() {
return PPField;
}
/**
* @param p1Field the p1Field to set
*/
public void setP1Field(String p1Field) {
P1Field = p1Field;
}
/**
* @return the p1Field
*/
public String getP1Field() {
return P1Field;
}
/**
* @param p2Field the p2Field to set
*/
public void setP2Field(String p2Field) {
P2Field = p2Field;
}
/**
* @return the p2Field
*/
public String getP2Field() {
return P2Field;
}
/**
* @param dNFields the dNFields to set
*/
public void setDNFields(String dNFields) {
DNFields = dNFields;
}
/**
* @return the dNFields
*/
public String getDNFields() {
return DNFields;
}
/**
* @param mTField the mTField to set
*/
public void setMTField(String mTField) {
MTField = mTField;
}
/**
* @return the mTField
*/
public String getMTField() {
return MTField;
}
public boolean calcCheckSum(IoSession MySess) throws
CharacterCodingException{
int readed_Bytes = getReaded_bytes();
rewind_buf();
// Calculate checksumm
int chksum = 0;
int sum = 0;
for (int idx = 0; idx < readed_Bytes; idx += 2) {
log.debug(" <<<<<<<>>>>>>> tempBuffer.array(),"+ idx +",2 " +
new String(tempBuffer.array(), idx, 2));
if(idx != 16){
chksum += Integer.parseInt(new
String(tempBuffer.array(), idx, 2), 16);
}else{
sum = Integer.parseInt(new String(tempBuffer.array(),
idx, 2), 16);
}
}
log.debug("vorher chksum " + Integer.toHexString(chksum));
//chksum &= 0xFF;
int t = 0xFF-chksum;
t &=0xFF;
log.debug("t " + Integer.toHexString(t));
String st = new String(Integer.toHexString(t)).toUpperCase();
log.debug("ll " + st.equals(Integer.toHexString(sum).toUpperCase()));
if (st.equals(Integer.toHexString(sum).toUpperCase()) && readed_Bytes
>= 18) {
return true;
}
else {
return false;
}
}
/**
* @param myIoSess the myIoSess to set
*/
public void setMyIoSess(IoSession myIoSess) {
MyIoSess = myIoSess;
}
/**
* @return the myIoSess
*/
public IoSession getMyIoSess() {
return MyIoSess;
}
/**
* @param inMsg the inMsg to set
*/
public void setInMsg(Boolean inMsg) {
this.inMsg = inMsg;
}
/**
* @return the inMsg
*/
public Boolean getInMsg() {
return inMsg;
}
/**
* @param tempBuffer the tempBuffer to set
*/
public void setTempBuffer(byte tB) {
this.tempBuffer.put(tB);
}
/**
* @return the tempBuffer
*/
public IoBuffer getTempBuffer() {
return tempBuffer;
}
public void rewind_buf(){
tempBuffer.rewind();
}
/**
* @param readed_bytes the readed_bytes to set
*/
public void setReaded_bytes(Integer readed_bytes) {
this.readed_bytes = readed_bytes;
}
/**
* @return the readed_bytes
*/
public Integer getReaded_bytes() {
return readed_bytes;
}
public void incReaded_bytes(){
readed_bytes++;
}
protected void finalize() throws Throwable{
log.debug("Called " + this.getClass().getName() + ".finalize");
}
}
package org.ala;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ProtoDecoder extends CumulativeProtocolDecoder {
private final byte START_MSG = 0x02;
private final byte END_MSG = 0x03;
private final byte ACK = 0x06;
private final byte NACK = 0x16;
private final Logger log;
public ProtoDecoder() {
super();
log = LoggerFactory.getLogger(ProtoDecoder.class);
log.debug("Created ProtoDecoder");
}
@Override
protected boolean doDecode(IoSession session, IoBuffer in,
ProtocolDecoderOutput out) throws Exception {
MyMessage cm;
if(session.containsAttribute("cm")){
cm = (MyMessage) session.getAttribute("cm");
}else{
cm = new MyMessage(session);
session.setAttribute("cm", cm);
}
// Now find the first <STX> (Hex 02) for Synch
while (in.hasRemaining()) {
byte current = in.get();
switch (current) {
case START_MSG:
log.debug("Start Message");
cm.setInMsg(true);
cm.setReaded_bytes(0);
break;
case END_MSG:
log.debug("End Message");
cm.setInMsg(false);
break;
case ACK:
log.debug("ACK");
break;
case NACK:
log.debug("NACK");
break;
default:
if(!cm.getInMsg()){
log.debug("no Start Message
found");
}else{
cm.setTempBuffer(current);
cm.incReaded_bytes();
// Decoded one line;
CumulativeProtocolDecoder will
// call me again until I return
false. So just
// return true until there are
no more lines in the
// buffer.
return true;
}// end else
break;
}// End switch (current)
}// end while
cm.getTempBuffer().flip();
if(!cm.getInMsg() && cm.calcCheckSum(session)){
log.debug("OK");
}else
log.debug("NOK");
out.write(in);
log.debug("return false");
return false;
} // end doDecode
} // end class