Hi Arnaud,

Original Code is here which I attached in my previous email (I will update it with your previous comments e.g. remove printMap method)

SendMessage.java:


package apache.qpid.client;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageAcceptMode;
import org.apache.qpidity.transport.MessageAcquireMode;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.ReplyTo;

/*
* this sends the "Class Query" and "Schema Request" to the broker
*/
public class SendMessage
{       // AMQP type
        String opcode;
/*      int sequenceNo;
String packageName; String schemaHash;
        String classname;*/

        public static void main(String[] args)
        {
                SendMessage p=new SendMessage();
                // Create connection
                Connection con = Client.createConnection();
                ByteBuffer message= ByteBuffer.allocate(100);
                try
                {       // connect to local host on default port 5672
                con.connect("localhost", 5672, "test", "guest", "guest");
                }
                catch(Exception e)
                {
                System.out.print("Error connecting to broker");
                e.printStackTrace();
                }

                // Create session
                Session session = con.createSession(0);
                DeliveryProperties deliveryProps = new DeliveryProperties();
                // set the routing key as "agent"
                deliveryProps.setRoutingKey("agent");
                MessageProperties messageProps=new MessageProperties();
// set replyTo field so that messages return to the "reply" queue ReplyTo rpt=new ReplyTo();
                rpt.setExchange("amq.direct");
                rpt.setRoutingKey("reply");
                messageProps.setReplyTo(rpt);

                // transfer message to "qpid.management" exchange
session.messageTransfer("qpid.management", MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
                session.header(deliveryProps,messageProps);
                /*
                 * JUST FOR TEST
                 * Read input from keyboard
                 * if user enters 1 so it sends the "Class Query" or if
                * enters 2 then it means client sends
* the "Schema Request" */
                System.out.println("Enter an opertion ");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
                int i=0;
                try {
                        i = Integer.parseInt(br.readLine().trim());
                } catch (Exception e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
}
                switch (i)
                {
                /*
* if user enters 1 then a "Class Query" message will be sent to the broker * * +-----+-----+-----+-----+-----------------------+
        | 'A' | 'M' | '1' | 'Q' |          seq          |
        +-----+-----+-----+-----+-----------------------+----------+
        |  package name (short string)                             |
        +----------------------------------------------------------+

                 */
                case 1:

                        p.opcode="AM1Q";
//p.sequenceNo=100; p.packageName="qpid"; p.classname="exchange"; /* * I am wondering I am sending only the type/opcode (e.g. AM1Q) and it works * unlike what the format mentioned above (according to management-design notes) * http://cwiki.apache.org/qpid/management-design-notes.html
*/

                        message.clear();
                        try {
                                message.put(p.opcode.getBytes("UTF-8"));
                        } catch (UnsupportedEncodingException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                        }
                        break;

                        /*
* if user enters 2 then a "Schema request" message will be sent to the broker
                 *   +-----+-----+-----+-----+-----------------------+
                | 'A' | 'M' | '1' | 'S' |          seq          |
                +-----+-----+-----+-----+-----------------------+----------+
                |                packageName (short string)                |
                +----------------------------------------------------------+
                |                className (short string)                  |
                +----------------------------------------------------------+
                |                schema-hash (bin128)                      |
                +----------------------------------------------------------+
                                
                         */
                case 2:
                        p.opcode="AM1S";
//p.sequenceNo=100; p.packageName="qpid"; p.classname="exchange"; /* * I am wondering I am only sending the type/opcode (e.g. AM1S) and it works... * unlike what the format mentioned above (according to management-design notes) * http://cwiki.apache.org/qpid/management-design-notes.html */
                        message.clear();
                        try {
                                message.put(p.opcode.getBytes("UTF-8"));
                        
                        } catch (UnsupportedEncodingException e1) {
                                // TODO Auto-generated catch block
                                e1.printStackTrace();
                        }
                        break;

                default:
                        break;
                }
                message.flip();
                // send message
                session.data(message);
                session.endData();
                // confirm completion
                session.sync();
                //cleanup
                session.sessionDetach(session.getName());
                try
                {       // clise connection
                        con.close();
                }
                catch(Exception e)
                {
                        System.out.print("Error closing broker connection");
                        e.printStackTrace();
                }
        }
}


ListenerReply.java:


package apache.qpid.client;

import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;

import org.apache.qpidity.transport.MessageCreditUnit;

/**
* This listen messages on reply queue and terminates
* when it sees the final message
*
*/
public class ListenerReply implements MessageListener
{
        ByteBuffer buf;
        public void onMessage(Message m)
        {
                try
                {
                        buf = m.readData();
                        // get the Magic number "AM1"
                        char magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        // get the opcode
                        char opcode = (char)buf.get();
                        System.out.println(opcode);
                        // java decoder
                        ManagementDecoder decoder = new ManagementDecoder(buf);
                        // get the decoded sequence number
                        System.out.println(decoder.readSequenceNo());

                        switch (opcode) {
                
        /*
        * if the response is "query indication"
        *  +-----+-----+-----+-----+-----------------------+
       | 'A' | 'M' | '1' | 'q' |          seq          |
       +-----+-----+-----+-----+-----------------------+----------+
       |  package name (short string)                             |
       +----------------------------------------------------------+
       |  class name (short string)                               |
       +----------------------------------------------------------+
       |  schema hash (bin128)                                    |
       +----------------------------------------------------------+

                */
                        case 'q':
                                // decode the package name
                                String packagename = decoder.readStr8();
                                // decode the class name
                                String classname = decoder.readStr8();
                        System.out.println("Package Name: " + packagename);
                        System.out.println("Class Name: " + classname);
// decode the schema hash // TODO: this prints the hexa value of schema... need to be printed in readable System.out.println("Schema hash is: " + org.apache.qpidity.transport.util.Functions.str(decoder.readBin128())); //System.out.println("Schema hash is: " + decoder.readUint64() + decoder.readUint64()); break;
                /*
                * if the response is "schema response"
* +-----+-----+-----+-----+-----------------------+
 | 'A' | 'M' | '1' | 's' |          seq          |
 +-----+-----+-----+-----+-----------------------+----------+
 |                packageName (short string)                |
 +----------------------------------------------------------+
 |                className (short string)                  |
 +----------------------------------------------------------+
 |                schema-hash (bin128)                      |
 +-----------+-----------+-----------+-----------+----------+
 | propCnt   | statCnt   | methodCnt | eventCnt  |
+-----------+-----------+-----------+-----------+----------------------------+
 | propCnt property records |
+----------------------------------------------------------------------------+
 | statCnt statistic records |
+----------------------------------------------------------------------------+
 | methodCnt method records |
+----------------------------------------------------------------------------+
 | eventCnt event records |
+----------------------------------------------------------------------------+
*/
                        case 's':
                        String packname = decoder.readStr8();
                        String clasnam = decoder.readStr8();

                        System.out.println("Package Name: " + packname);
System.out.println("Class Name: " + clasnam); // Added new method i.e. readBin128() in AbstractDecoder.java (under org.apache.qpidity.transport.codec) for decoding 16 bytes of opaque binary data // TODO: this prints the hexa value of schema... need to be printed in readable System.out.println("Schema is: " + org.apache.qpidity.transport.util.Functions.str(decoder.readBin128()));
        // get the decoded properties contents
        int propCnt = decoder.readUint16();
        System.out.println("Property content:   " + propCnt);
        for( int i = 0; i < propCnt; i++ )
        {       // decode the MAP
// FIXME: getting exception here "java.lang.IllegalArgumentException:
// unknown code: 6"
        Map<String,Object> map = decoder.readMap();
        printMap(map);
        }
// TODO: Also do the same for statistic records, method records, // event records break;
                        
                        default:
                        break;
                        }
                }
                catch(Exception e)
                {
                        System.out.print("Error reading message");
                        e.printStackTrace();
                }
        }
        /*
         * print the decoded map
         * TODO: fix the warning and comment it when it works
         */
        public void printMap(Map mapnew)
        {
                Set s=mapnew.entrySet();
                Iterator it=s.iterator();
for (Iterator iterator = s.iterator(); iterator.hasNext();) {
                        Object object = (Object) iterator.next();
                        System.out.println(object.toString());
                }
        }

        
        public static void main(String[] args)
        {
                // Create connection
                Connection con = Client.createConnection();
                try
                {       // connect to local host on default port 5672
        con.connect("localhost", 5672, "test", "guest", "guest");
                }
                catch(Exception e)
                {
                        System.out.print("Error connecting to broker");
                        e.printStackTrace();
                }
                // Create session
                Session session = con.createSession(0);
                // Create an instance of the listener
                ListenerReply listener = new ListenerReply();
                // create a subscription with the "reply" queue
                session.messageSubscribe("reply",
                                "listener_reply",
                                Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
                                new MessagePartListenerAdapter(listener), null);
                // controls the flow of message data to a given destination
session.messageFlow("listener_reply", MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
                // will get maximum of 11 messages
                session.messageFlow("listener_reply", 
MessageCreditUnit.MESSAGE, 11);
                // confirm completion
                session.sync();
// This method cancels a consumer. The server will not send any more messages to the "listener_reply" destination.
                session.messageCancel("listener_reply");
                //cleanup
                session.sessionDetach(session.getName());
                try
                {       // close connection
                        con.close();
                }
                catch(Exception e)
                {
                        System.out.print("Error closing broker connection");
                        e.printStackTrace();
                }
        }

}



-------

Best Regards,
Rahul

On Jun 27 2008, Arnaud Simon wrote:

Hi,

> Rafael mentioned last night that AbastractDecoder does not know it so > we added another method i.e. readBin128() in AbstractDecoder.java > (under org.apache.qpidity.transport.codec) for decoding 16 bytes of > opaque binary data. It produced some unknown characters (e.g. output: > Schema hash is: > &#65533;\&#65533;W&#65533;p&#65533;`&#1566;??V&#65533;?) as schema hash > is an md5 digest and md5 hashes are binary data so in this case I was > converting to a string like this System.out.println("Schema hash is " + > new String(decoder.readBin128())); > > So I converted this non printable characters to hex (with Rafael's > advice just to get rid off these some unknown character) by using > org.apache.qpidity.transport.util.Functions.str(decoder.readBin128()) > method. so the schema hash printed as (Schema is: > "\x85\x5c\xea\x96W\xe3p\xcc`\xd8\x9e\x1a\x08V\xa4\x0e")... > > 1) I want to see the Schema hash in the correct way ? How would I > print it?

I don't think you want to print the hash, you only need it for sending
further requests. So, the hash is only 16 bytes of opaque binary, that's
all.
> -----
> > As i was ignoring what i was getting for "schema hash" field (hax > value) and moving on to next field... i.e. propCnt (properties > content), i am doing this
>   int propCnt = decoder.readUint16();
> System.out.println("Property content: " + propCnt);
> > for( int i = 0; i < propCnt; i++ )
> {
> Map<String,Object> map = decoder.readMap();
> printMap(map);
> }
>   public void printMap(Map mapnew)
> {
> Set s=mapnew.entrySet();
> Iterator it=s.iterator();
> for (Iterator iterator = s.iterator(); iterator.hasNext();) {
> Object object = (Object) iterator.next();
> System.out.println(object.toString());
> > } > > BUT, getting an exception here
> A
> M
> 1
> s
> 15
> Package Name: qpid
> Class Name: vhost
> Schema is: "\x85\x5c\xea\x96W\xe3p\xcc`\xd8\x9e\x1a\x08V\xa4\x0e"
> Property content:   2
> code is : 6
> > Error reading messagejava.lang.IllegalArgumentException: unknown code: 6 > at > org.apache.qpidity.transport.codec.AbstractDecoder.getType(AbstractDecoder.java:302) > at > org.apache.qpidity.transport.codec.AbstractDecoder.readMap(AbstractDecoder.java:253)
>    at apache.qpid.client.ListenerReply.onMessage(ListenerReply.java:109)
> at > org.apache.qpidity.nclient.util.MessagePartListenerAdapter.messageReceived(MessagePartListenerAdapter.java:56) > at > org.apache.qpidity.nclient.impl.ClientSessionDelegate.data(ClientSessionDelegate.java:41) > at > org.apache.qpidity.transport.SessionDelegate.data(SessionDelegate.java:1)
>    at org.apache.qpidity.transport.Data.delegate(Data.java:78)
>    at org.apache.qpidity.transport.Channel.data(Channel.java:114)
>    at org.apache.qpidity.transport.Channel.data(Channel.java:1)
>    at org.apache.qpidity.transport.Data.delegate(Data.java:78)
>    at org.apache.qpidity.transport.Channel.received(Channel.java:75)
>    at org.apache.qpidity.transport.Connection.received(Connection.java:84)
>    at org.apache.qpidity.transport.Connection.received(Connection.java:1)
> at > org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:92) > at > org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:97) > at > org.apache.qpidity.transport.network.Assembler.frame(Assembler.java:125)
>    at org.apache.qpidity.transport.network.Frame.delegate(Frame.java:145)
> at > org.apache.qpidity.transport.network.Assembler.received(Assembler.java:102) > at > org.apache.qpidity.transport.network.Assembler.received(Assembler.java:1) > at > org.apache.qpidity.transport.network.InputHandler.frame(InputHandler.java:103) > at > org.apache.qpidity.transport.network.InputHandler.next(InputHandler.java:204) > at > org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:116) > at > org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:1) > at > org.apache.qpidity.transport.network.mina.MinaHandler.messageReceived(MinaHandler.java:87) > at > org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:703) > at > org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362) > at > org.apache.mina.common.support.AbstractIoFilterChain.access$1200(AbstractIoFilterChain.java:54) > at > org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:800) > at > org.apache.mina.common.support.AbstractIoFilterChain$HeadFilter.messageReceived(AbstractIoFilterChain.java:617) > at > org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362) > at > org.apache.mina.common.support.AbstractIoFilterChain.fireMessageReceived(AbstractIoFilterChain.java:353) > at > org.apache.mina.transport.socket.nio.SocketIoProcessor.read(SocketIoProcessor.java:281) > at > org.apache.mina.transport.socket.nio.SocketIoProcessor.process(SocketIoProcessor.java:241) > at > org.apache.mina.transport.socket.nio.SocketIoProcessor.access$500(SocketIoProcessor.java:44) > at > org.apache.mina.transport.socket.nio.SocketIoProcessor$Worker.run(SocketIoProcessor.java:559) > at > org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:43)
>    at java.lang.Thread.run(Thread.java:619)
> > > I have just an idea why it is raising but how to mitigate it I dont > know. > > 2) How to fix it? Can anyone help me?

Looking at http://cwiki.apache.org/qpid/management-design-notes.html,
the message format is as follows: +-----+-----+-----+-----+-----------------------+
        | 'A' | 'M' | '1' | 's' |          seq          |
        +-----+-----+-----+-----+-----------------------+----------+
        |                packageName (short string)                |
        +----------------------------------------------------------+
        |                className (short string)                  |
        +----------------------------------------------------------+
        |                schema-hash (bin128)                      |
        +-----------+-----------+-----------+-----------+----------+
        | propCnt   | statCnt   | methodCnt | eventCnt  |
+-----------+-----------+-----------+-----------+----------------------------+
        | propCnt property records |
+----------------------------------------------------------------------------+
        | statCnt statistic records |
+----------------------------------------------------------------------------+
        | methodCnt method records |
+----------------------------------------------------------------------------+
        | eventCnt event records |
+----------------------------------------------------------------------------+

This means that you need to read propCnt, statCnt, methodCnt and then
eventCnt before you can read propCnt maps. statCnt maps, methodCnt maps
and then eventCnt maps. You are currently not reading statCnt, methodCnt
and eventCnt.

Arnaud



Reply via email to