Dear mina list members,

I try to create a new ProtoDecoder with MINA version 2.0.0-M6 for a
binary protocol which comes over the Serial transporter.

Every time when I get message from the serial input my handler is called
6 times with the same input.

Attached my code, if anybody want to take a look what I have done;-).

It would be nice when somebody can help me to find the error.
I'am sure I have missunderstand the interaction between the decoder and
the handler ;-(

Due to the fact that I'am new with mina maybe there is also a better way
to save the input then to add a IoBuffer in MyMessage.

The log output writes the following:

10925 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
10944 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
10947 [Thread-3] DEBUG MyHandler  - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
10947 [Thread-3] DEBUG MyHandler  - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
10947 [Thread-3] DEBUG MyHandler  -  session readed_bytes:18

11412 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
11433 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
11436 [Thread-3] DEBUG MyHandler  - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
11436 [Thread-3] DEBUG MyHandler  - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
11436 [Thread-3] DEBUG MyHandler  -  session readed_bytes:18

11905 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0 lim=8
cap=8: 02 30 30 35 42 30 30 46]
11926 [Thread-3] INFO  LoggingFilter  - RECEIVED: HeapBuffer[pos=0
lim=12 cap=12: 46 31 30 46 46 31 32 32 32 36 32 03]
11929 [Thread-3] DEBUG MyHandler  - cm tb HeapBuffer[pos=0 lim=18
cap=256: 30 30 35 42 30 30 46 46 31 30 46 46 31 32 32 32...]
11929 [Thread-3] DEBUG MyHandler  - Received : HeapBuffer[pos=12 lim=12
cap=12: empty]
11929 [Thread-3] DEBUG MyHandler  -  session readed_bytes:18

next sequenze ...
###

Many many thanks

BR

Aleks
package org.ala;

import java.net.InetSocketAddress;

import org.apache.mina.core.future.ConnectFuture;
import org.apache.mina.core.service.IoConnector;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.ProtocolCodecFilter;
import org.apache.mina.filter.codec.textline.TextLineEncoder;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.serial.SerialAddress;
import org.apache.mina.transport.serial.SerialConnector;
import org.apache.mina.transport.serial.SerialAddress.DataBits;
import org.apache.mina.transport.serial.SerialAddress.FlowControl;
import org.apache.mina.transport.serial.SerialAddress.Parity;
import org.apache.mina.transport.serial.SerialAddress.StopBits;
import org.apache.mina.transport.socket.SocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;

public class Main {

        private static final int PORT = 8080;
        
        /**
         * @param args
         */
        public static void main(String[] args) throws Exception {
                // TODO Auto-generated method stub
                
                ProtocolCodecFilter pcf = new ProtocolCodecFilter(new 
TextLineEncoder(),new ProtoDecoder());
                
                /* */
                IoConnector connector = new SerialConnector();
                connector.getFilterChain().addFirst("logging", new 
LoggingFilter());
                connector.getFilterChain().addLast("codec", pcf);
                
                connector.setConnectTimeoutMillis(30);
                connector.setHandler(new MyHandler());
                  
                SerialAddress portAddress = new SerialAddress("/dev/ttyS0", 
19200, DataBits.DATABITS_7, StopBits.BITS_1, Parity.EVEN, FlowControl.NONE );
                  
                try {
                          ConnectFuture future = connector.connect(portAddress);

                          future.await();
                          System.out.println("fu-isConn " + 
future.isConnected());
                          if(future.isConnected()){
                                  IoSession sessin = future.getSession();
                                  System.out.println("fu Str " + 
future.toString());
                                  System.out.println("sess " + sessin);
                                  //sessin.close(false);
                                  //connector.dispose();
                          }else{
                                  future.cancel();
                                  connector.dispose();
                                  System.exit(0);
                          }
                  //sessin.close(true);
                  }catch(Exception e){
                                e.printStackTrace();
                                connector.dispose();
                                System.exit(0);
                  }
                 
                  //System.exit(0);
                
                /* * /
                SocketAcceptor accpetor = new NioSocketAcceptor();
                accpetor.setReuseAddress( true );

                accpetor.getFilterChain().addFirst("logging", new 
LoggingFilter());
                accpetor.getFilterChain().addLast("codec",pcf);
                
                accpetor.setHandler(new MyHandler());
                accpetor.bind(new InetSocketAddress(PORT));
                System.out.println("Listeningon port " + PORT);
                / * */
        }

}
package org.ala;

import org.ala.MyMessage;

import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyHandler extends IoHandlerAdapter {
        private final Logger log;
        private MyMessage cm;
        
        public MyHandler() {
                // TODO Auto-generated constructor stub
                log = LoggerFactory.getLogger(MyHandler.class);
                log.debug("Created MyHandler");
        }
        
        public void messageReceived(IoSession session, Object message) throws 
Exception {
                
                if(session.containsAttribute("cm")){
                        cm = (MyMessage) session.getAttribute("cm");
                        log.debug("have MyMessage " + cm );
                }else{
                        cm = new MyMessage(session);
                        session.setAttribute("cm", cm);
                }
                
                if(cm.getInMsg()){
                        log.debug("still in body");
                }else{
                log.debug("cm tb " + cm.getTempBuffer());
                log.debug("Received : " + message);
                log.debug(" session readed_bytes:" + cm.getReaded_bytes());
                  if(session.containsAttribute("cm")){
                        cm = (MyMessage) session.getAttribute("cm");
                        session.removeAttribute("cm");
                        cm = null;
                        log.debug("in " + this.getClass().getName() + 
".messageReceived removed MyMessage ");
                  }
                }
        }

        public void sessionOpened(IoSession session) throws Exception{
                log.debug("sessionOpened " + session);
        }

        public void sessionCreated(IoSession session) throws Exception{
                session.getConfig().setIdleTime(IdleStatus.BOTH_IDLE, 30); // 
sec
                log.debug("sessionCreated " + session);
        }

        public void sessionClosed(IoSession session) throws Exception {
                log.debug("sessionClosed " + session);
        }
                  
        public void exceptionCaught(IoSession session, Throwable cause) throws 
Exception{
                log.debug(" session " + session + " exceptionCaught:");
                log.error("exceptionCaught ", cause);
                //cause.printStackTrace();
                  if(session.containsAttribute("cm")){
                        cm = (MyMessage) session.getAttribute("cm");
                        session.removeAttribute("cm");
                        cm = null;
                        log.debug("in " + this.getClass().getName() + 
".exceptionCaught removed MyMessage ");
                  }
                session.close(true);
        }
                  
        @Override
        public void sessionIdle(IoSession session, IdleStatus status) throws 
Exception{
                //System.out.println("sessionIdle status " + 
session.getIdleCount( status));
                log.debug("sessionIdle status " + session.getIdleCount( 
status));
                
                if(session.getIdleCount( status ) == 3){
                  if(session.containsAttribute("cm")){
                        cm = (MyMessage) session.getAttribute("cm");
                        session.removeAttribute("cm");
                        cm = null;
                        log.debug("in " + this.getClass().getName() + 
".sessionIdle removed MyMessage ");
                  }
                  session.close(true);
                }
        }
}
package org.ala;

import java.nio.charset.CharacterCodingException;
import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MyMessage {

        private String RRField;
        private String PPField;
        private String TTField;
        private String P1Field;
        private String P2Field;
        private String DNFields;
        private String MTField;
        
        private Boolean inMsg;
        
        private IoSession MyIoSess;
        
        private IoBuffer tempBuffer;

        private Integer readed_bytes;
        
    private final Logger log;
        
        public MyMessage(IoSession session){
                setMyIoSess(session);
                setInMsg(false);
                setReaded_bytes(0);
                tempBuffer = IoBuffer.allocate(256).setAutoExpand(true);
                log = LoggerFactory.getLogger(MyMessage.class);
                log.debug("Created MyMessage");

        }

        /**
         * @param rRField the rRField to set
         */
        public void setRRField(String rRField) {
                RRField = rRField;
        }

        /**
         * @return the rRField
         */
        public String getRRField() {
                return RRField;
        }

        /**
         * @param tTField the tTField to set
         */
        public void setTTField(String tTField) {
                TTField = tTField;
        }

        /**
         * @return the tTField
         */
        public String getTTField() {
                return TTField;
        }

        /**
         * @param pPField the pPField to set
         */
        public void setPPField(String pPField) {
                PPField = pPField;
        }

        /**
         * @return the pPField
         */
        public String getPPField() {
                return PPField;
        }

        /**
         * @param p1Field the p1Field to set
         */
        public void setP1Field(String p1Field) {
                P1Field = p1Field;
        }

        /**
         * @return the p1Field
         */
        public String getP1Field() {
                return P1Field;
        }

        /**
         * @param p2Field the p2Field to set
         */
        public void setP2Field(String p2Field) {
                P2Field = p2Field;
        }

        /**
         * @return the p2Field
         */
        public String getP2Field() {
                return P2Field;
        }

        /**
         * @param dNFields the dNFields to set
         */
        public void setDNFields(String dNFields) {
                DNFields = dNFields;
        }

        /**
         * @return the dNFields
         */
        public String getDNFields() {
                return DNFields;
        }

        /**
         * @param mTField the mTField to set
         */
        public void setMTField(String mTField) {
                MTField = mTField;
        }

        /**
         * @return the mTField
         */
        public String getMTField() {
                return MTField;
        }

        public boolean calcCheckSum(IoSession MySess) throws 
CharacterCodingException{
                int readed_Bytes = getReaded_bytes();
                rewind_buf();
//      Calculate checksumm
        int chksum = 0;
        int sum = 0;
        for (int idx = 0; idx < readed_Bytes; idx += 2) {
                log.debug(" <<<<<<<>>>>>>> tempBuffer.array(),"+ idx +",2 " + 
new String(tempBuffer.array(), idx, 2));
                if(idx != 16){
                        chksum += Integer.parseInt(new 
String(tempBuffer.array(), idx, 2), 16);
                }else{
                        sum = Integer.parseInt(new String(tempBuffer.array(), 
idx, 2), 16);
                }
        }
        log.debug("vorher chksum " + Integer.toHexString(chksum));
        //chksum &= 0xFF;
        int t = 0xFF-chksum;
        t &=0xFF;
        log.debug("t      " + Integer.toHexString(t));
        
        String st = new String(Integer.toHexString(t)).toUpperCase();
        log.debug("ll " + st.equals(Integer.toHexString(sum).toUpperCase()));
        
        if (st.equals(Integer.toHexString(sum).toUpperCase()) && readed_Bytes 
>= 18) {
                return true;
        }
        else {
                return false;
        }
        }
        /**
         * @param myIoSess the myIoSess to set
         */
        public void setMyIoSess(IoSession myIoSess) {
                MyIoSess = myIoSess;
        }

        /**
         * @return the myIoSess
         */
        public IoSession getMyIoSess() {
                return MyIoSess;
        }

        /**
         * @param inMsg the inMsg to set
         */
        public void setInMsg(Boolean inMsg) {
                this.inMsg = inMsg;
        }

        /**
         * @return the inMsg
         */
        public Boolean getInMsg() {
                return inMsg;
        }

        /**
         * @param tempBuffer the tempBuffer to set
         */
        public void setTempBuffer(byte tB) {
                this.tempBuffer.put(tB);
        }

        /**
         * @return the tempBuffer
         */
        public IoBuffer getTempBuffer() {
                return tempBuffer;
        }
        
        public void rewind_buf(){
                tempBuffer.rewind();
        }

        /**
         * @param readed_bytes the readed_bytes to set
         */
        public void setReaded_bytes(Integer readed_bytes) {
                this.readed_bytes = readed_bytes;
        }

        /**
         * @return the readed_bytes
         */
        public Integer getReaded_bytes() {
                return readed_bytes;
        }
        public void incReaded_bytes(){
                readed_bytes++;
        }

        protected void finalize() throws Throwable{
                log.debug("Called " + this.getClass().getName() + ".finalize");
        }
}
package org.ala;

import org.apache.mina.core.buffer.IoBuffer;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ProtoDecoder extends CumulativeProtocolDecoder {

        private final byte START_MSG = 0x02;
        private final byte END_MSG   = 0x03;
        private final byte ACK       = 0x06;
        private final byte NACK      = 0x16;
        
        private final Logger log;
        
        public ProtoDecoder() {
                super();
                log = LoggerFactory.getLogger(ProtoDecoder.class);
                log.debug("Created ProtoDecoder");
        }
        
        
        @Override
        protected boolean doDecode(IoSession session, IoBuffer in,
                        ProtocolDecoderOutput out) throws Exception {

                MyMessage cm;
                if(session.containsAttribute("cm")){
                        cm = (MyMessage) session.getAttribute("cm");
                }else{
                        cm = new MyMessage(session);
                        session.setAttribute("cm", cm);
                }
        
        // Now find the first <STX>   (Hex 02) for Synch
        
        while (in.hasRemaining()) {
            byte current = in.get();
            
            switch (current) {
                                case START_MSG:
                                        log.debug("Start Message");
                                        cm.setInMsg(true);
                                        cm.setReaded_bytes(0);
                                        break;
                                case END_MSG:
                                        log.debug("End Message");
                                        cm.setInMsg(false);
                                        break;
                                case ACK:
                                        log.debug("ACK");
                                        break;
                                case NACK:
                                        log.debug("NACK");
                                        break;
                                default:
                                        if(!cm.getInMsg()){
                                                log.debug("no Start Message 
found");
                                        }else{
                                                cm.setTempBuffer(current);
                                                cm.incReaded_bytes();
                                                // Decoded one line; 
CumulativeProtocolDecoder will
                                                // call me again until I return 
false. So just
                                                // return true until there are 
no more lines in the
                                                // buffer.
                                                return true;
                                        }// end else
                                        break;
                }// End switch (current)
            }// end while
        cm.getTempBuffer().flip();

                if(!cm.getInMsg() && cm.calcCheckSum(session)){
                        log.debug("OK");
                }else
                        log.debug("NOK");

        out.write(in);
        log.debug("return false");
                return false;
        } // end doDecode
} // end class

Reply via email to