Hi Arnaud,

I hope you are well.

Apropos of our discussion on the IRC last evening, I am attaching the code
(for keeping the format) with this email for your consideration.

I attached three java files here:

1. Queue.java - to create two queues on the broker
2. SendMessage.java - sends the request to the broker
3. ListenerReply.java - listen and retrieve the responses from the broker

Please check the case 4: in SendMessage (where we request the schema from
the broker)... because when I do this then ListenerReply.java (case 's':)is
not able to get the schema.

-> Am I sending the schema request in correct format? OR am I retriving from
the broker in a correct way? Please help me here.

-> Also please let me know how to invoke the method? What is this field
"input and bidirectional argument values (in schema order)" in AM1M (Method
request format).

I look forward to hearing from you.

Best Regards,
Rahul
package apache.qpid.client;

import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;

/*
 * @author [EMAIL PROTECTED]
 * 
 * this program 
 * -> connects with the broker on the default port 5672 with the user & 
password "guest" 
 * -> creates two queues in the broker
 * 1. "management"
 * 2. "reply"
 * -> bind these two queues to the exchanges
 * 1. "management" queue is bounded on the "qpid.management" exchange with the 
routing key "mgmt.#", and
 * 2. "reply" queue is bounded on the "amq.direct" exchange with the routing 
key "reply" 
 * -> Broker sends all the management information on the "management" queue
 */
public class Queue {

        public static void main(String args[])
        {
                // Create connection
                Connection con = Client.createConnection();
                try
                {       // connect with the broker on the default port 5672
                        con.connect("localhost", 5672, "test", "guest", 
"guest");
                }
                catch (Exception e)
                {
                        System.out.print("Error connecting to broker");
                        e.printStackTrace();
                }

                // Create a session
                Session session = con.createSession(0);

                // declare and bind (with "qpid.management" exchange) the 
management queue
                session.queueDeclare("management", null, null);
                session.exchangeBind("management", "qpid.management", "mgmt.#", 
null);

                // confirm completion
                session.sync();
                // declare and bind (with "amq.direct" exchange) the reply queue
                session.queueDeclare("reply", null, null);
                session.exchangeBind("reply", "amq.direct", "reply", null);
                // confirm completion
                session.sync();


                //cleanup
                session.sessionDetach(session.getName());
                try
                {
                        con.close();
                }
                catch(Exception e)
                {
                        System.out.print("Error closing broker connection");
                        e.printStackTrace();
                }
        }

}





package apache.qpid.client;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Properties;

import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageAcceptMode;
import org.apache.qpidity.transport.MessageAcquireMode;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.ReplyTo;
import org.apache.qpidity.transport.codec.Encoder;

/*
 * this sends the "Class Query" and "Schema Request" to the broker
 */
public class SendMessage
{       // AMQP type
        String opcode;
        int sequenceNo;
        String packageName; 
        String classname;
        //byte[] schemaHash=new byte[16];
        static byte[] schemaHash;
        // this file contains only schema for "vhost" class at run time
        private String infoFile="./conf/info.prop";

        /*
         * ******this is just for testing purpose******
         * get the schema for "vhost" class as ListenerReply.java program writes
         * the schema for "vhost" in this file (info.prop) when it process the 
AM1Q request and 
         * gives all 14 classes with the schema hash.. then this program writes 
the schema hash value for VHOST in
         * this file...
         * 
         * Contents of info.prop :
         * ------
         * schema  = [EMAIL PROTECTED]
         * ------
         * We are using this value while requesting AM1S as the format is:
         * opcode = AM1S
         * Package name= qpid
         * Class name= vhost
         * Schema Hash= [EMAIL PROTECTED]
         * 
         */
        public void getSchemaHash() throws FileNotFoundException, IOException
        {
                Properties prop = new Properties();
                prop.load(new FileInputStream(new File(infoFile)));
                // getting [EMAIL PROTECTED] into schemaHash
                schemaHash              = prop.getProperty("schema").getBytes();

        }

        public static void main(String[] args)
        {
                SendMessage p=new SendMessage();
                // Create connection
                Connection con = Client.createConnection();
                ByteBuffer message= ByteBuffer.allocate(500);
                ManagementEncoder encoder=new ManagementEncoder(message);

                try
                {       // connect to local host on default port 5672
                        con.connect("localhost", 5672, "test", "guest", 
"guest");
                }
                catch(Exception e)
                {
                        System.out.print("Error connecting to broker");
                        e.printStackTrace();
                }

                // Create session
                Session session = con.createSession(0);
                DeliveryProperties deliveryProps = new DeliveryProperties();
                // set the routing key as "agent"
                deliveryProps.setRoutingKey("agent");
                MessageProperties messageProps=new MessageProperties();
                // set replyTo field so that messages return to the "reply" 
queue  
                ReplyTo rpt=new ReplyTo();
                rpt.setExchange("amq.direct");
                rpt.setRoutingKey("reply");
                messageProps.setReplyTo(rpt);

                // transfer message to "qpid.management" exchange
                session.messageTransfer("qpid.management", 
MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
                session.header(deliveryProps,messageProps);


                try {

                        System.out.println();
                        System.out.println("1. Broker Query Request");
                        System.out.println("2. Package Query Request");
                        System.out.println("3. Class Query Request");
                        System.out.println("4. Schema Query Request");
                        System.out.println("5. Method Request");
                        System.out.println("0. Exit");
                        System.out.println();
                        System.out.println("Please Enter your choice for 
REQUEST");
                        BufferedReader br = new BufferedReader(new 
InputStreamReader(System.in));
                        int choice=0;

                        choice = Integer.parseInt(br.readLine().trim());



                        switch (choice)
                        {
                        // if press 0 then Exit
                        case 0:
                                System.exit(0);
                                break;


                        case 1:
                                /*
                                 *       if user enters 1 then a "Broker Query" 
message will be sent to the broker
                                 *     
+-----+-----+-----+-----+-----------------------+
                                   | 'A' | 'M' | '1' | 'B' |           0        
   |
                                   
+-----+-----+-----+-----+-----------------------+
                                                The Broker Request message has 
no payload.
                                 */

                                p.opcode="AM1B";

                                message.clear();
                                try {
                                        message.put(p.opcode.getBytes("UTF-8"));
                                } catch (UnsupportedEncodingException e1) {
                                        e1.printStackTrace();
                                }
                                System.out.println("Query Broker Sent...");

                                break;

                        case 2:

                                /*
                                 *  if user enters 2 then a "Package Query" 
message will be sent to the broker
                                 *  
                                 *         
+-----+-----+-----+-----+-----------------------+
                                                | 'A' | 'M' | '1' | 'P' |       
   seq          |
                                                
+-----+-----+-----+-----+-----------------------+
                                 * 
                                 */
                                p.opcode="AM1P";

                                message.clear();
                                try {
                                        message.put(p.opcode.getBytes("UTF-8"));
                                } catch (UnsupportedEncodingException e1) {
                                        e1.printStackTrace();
                                }
                                System.out.println("Query Package Sent...");

                                break;


                        case 3:
                                /*
                                 * if user enters 3 then a "Class Query" 
message will be sent to the broker
                                 * 
                                 *  
+-----+-----+-----+-----+-----------------------+
                                | 'A' | 'M' | '1' | 'Q' |          seq          
|
                                
+-----+-----+-----+-----+-----------------------+----------+
                                |  package name (short string)                  
           |
                                
+----------------------------------------------------------+

                                 */
                                p.opcode="AM1Q";
                                p.sequenceNo=600;
                                p.packageName="qpid";
                                message.clear();
                                try {
                                        message.put(p.opcode.getBytes("UTF-8"));
                                        message.putInt( p.sequenceNo);
                                        encoder.writeStr8(p.packageName);
                                        message.putShort((byte) 
p.packageName.length());
                                        
message.put(p.packageName.getBytes("UTF-8"));
                                        //message.put(p.packageName.getBytes());
                                } catch (UnsupportedEncodingException e1) {
                                        e1.printStackTrace();
                                }
                                System.out.println("Query Class Sent...");

                                break;

                                
                                /* *****NEED TO CHECK WITH Rafael H. Schloming 
*****
                                 * Added new method i.e. writeBin128() in 
AbstractEncoder.java
                                 * (under org.apache.qpidity.transport.codec) 
for encoding 16 bytes of opaque binary data
                                 * 
                                 *      public byte[] writeBin128(byte[] s) {
                                 *  byte[] result = new byte[16];
                                 *  result=s;
                                 *  put(result);
                                 *  return result;
                                 *  }
                                 */
                        case 4:
                                /*
                                 * if user enters 4 then a "Schema request" 
message will be sent to the broker
                                 *   
+-----+-----+-----+-----+-----------------------+
                                | 'A' | 'M' | '1' | 'S' |          seq          
|
                                
+-----+-----+-----+-----+-----------------------+----------+
                                |                packageName (short string)     
           |
                                
+----------------------------------------------------------+
                                |                className (short string)       
           |
                                
+----------------------------------------------------------+
                                |                schema-hash (bin128)           
           |
                                
+----------------------------------------------------------+

                                 */
                                // get the schema hash for VHOST
                                p.getSchemaHash();
                                p.opcode="AM1S";
                                p.sequenceNo=500;
                                p.packageName="qpid";
                                p.classname="vhost";

                                message.clear();
                                try {
                                        // encode the opcode
                                        message.put(p.opcode.getBytes("UTF-8"));
                                        message.putInt( p.sequenceNo);
                                        // encode the package name
                                        encoder.writeStr8(p.packageName);
                                        message.putShort((byte) 
p.packageName.length());
                                        
message.put(p.packageName.getBytes("UTF-8"));
                                        // encode the class name
                                        encoder.writeStr8(p.classname);
                                        message.putShort((byte) 
p.classname.length());
                                        
message.put(p.classname.getBytes("UTF-8"));
                                        // encode the schema hash
                                        // schemaHash data member contains the 
schema hash for the "vhost" (fetching from the file)
                                        encoder.writeBin128(schemaHash);
                                        message.put((byte) schemaHash.length);
                                        message.put(schemaHash);
                                } catch (UnsupportedEncodingException e) {
                                        e.printStackTrace();
                                }

                                System.out.println("Schema Request Sent...");

                                break;


                        case 5:
                                // TODO: DOES NOT WORK
                                p.opcode="AM1M";

                                message.clear();
                                try {
                                        message.put(p.opcode.getBytes("UTF-8"));

                                } catch (UnsupportedEncodingException e1) {
                                        e1.printStackTrace();
                                }
                                System.out.println("Method Request Sent...");

                                break;

                        default:
                                System.out.println("Invalid number.");
                        break;
                        }

                        message.flip();
                        // send message
                        session.data(message);
                        session.endData();
                        // confirm completion
                        session.sync();
                        //cleanup
                        session.sessionDetach(session.getName());
                        try
                        {       // close connection
                                con.close();
                        }
                        catch(Exception e)
                        {
                                System.out.print("Error closing broker 
connection");
                                e.printStackTrace();
                        }


                } catch (Exception e) {
                        e.printStackTrace();
                } 

        }
}
package apache.qpid.client;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;

import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;

import org.apache.qpidity.transport.MessageCreditUnit;

/**
 * This listen messages on reply queue and terminates
 * when it sees the final message
 *
 */
public class ListenerReply implements MessageListener
{
        ByteBuffer buf;

        public void onMessage(Message m)
        {
                try
                {
                        buf = m.readData();
                        // get the Magic number "AM1"
                        char magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        magicNumber = (char)buf.get();
                        System.out.println(magicNumber);
                        // get the opcode
                        char opcode = (char)buf.get();
                        System.out.println(opcode);
                        System.out.println("I am here");
                        // java decoder
                        ManagementDecoder decoder = new ManagementDecoder(buf);
                        // get the decoded sequence number
                        System.out.println("Sequence No: " + 
decoder.readSequenceNo());
                        
                
                
                        switch (opcode) {
                                        
                        case 'b':
                                
                                /*
                                 *              Broker Response Message 
                                 *       
+-----+-----+-----+-----+-----------------------+
                                        | 'A' | 'M' | '1' | 'b' |           0   
        |
                                        
+-----+-----+-----+-----+-----------------------+----------------------------+
                                        | brokerId (uuid)                       
                                     |
                                        
+----------------------------------------------------------------------------+
                                 */
                                // decode the package name
                                UUID brokerID = decoder.readUuid();
                                System.out.println("Broker Identifier: " + 
brokerID);
                                break;
                                
                        
                        case 'p':
                                /*
                                 *              Package Indication Message
                                 *     
+-----+-----+-----+-----+-----------------------+
                                        | 'A' | 'M' | '1' | 'p' |          seq  
        |
                                        
+-----+-----+-----+-----+-----------------------+----------+
                                        |  package name (short string)          
                   |
                                        
+----------------------------------------------------------+
                                 */
                                // decode the package name
                                String packagenm = decoder.readStr8();
                                System.out.println("Package Name: " + 
packagenm);
                                break;

                                
                                
                                /* Added new method i.e. readBin128() in 
AbstractDecoder.java
                                 * (under org.apache.qpidity.transport.codec) 
for decoding 16 bytes of opaque binary data
                                 * 
                                 * public byte[] readBin128() { 
                                 * byte[] result = new byte[16];
                                 * get(result);
                                 * return result;
                                 * }
                                 */
                        case 'q':
                                /*
                                * if the response is "query indication"
                                *  
+-----+-----+-----+-----+-----------------------+
                        | 'A' | 'M' | '1' | 'q' |          seq          |
                        
+-----+-----+-----+-----+-----------------------+----------+
                        |  package name (short string)                          
   |
                        
+----------------------------------------------------------+
                        |  class name (short string)                            
   |
                        
+----------------------------------------------------------+
                        |  schema hash (bin128)                                 
   |
                        
+----------------------------------------------------------+

                                */
                                // decode the package name
                                String packagename = decoder.readStr8();
                                // decode the class name
                                String classname = decoder.readStr8();
                                System.out.println("Package Name: " + 
packagename);
                                System.out.println("Class Name: " + classname);
                                // decode the schema hash (***Added new method 
i.e. readBin128() in AbstractDecoder.java***)
                                byte[] schemaHash=decoder.readBin128();
                                System.out.println("schema is " + schemaHash);
                                
                                BufferedWriter out = null;
                                // writing the schema hash for VHOST in this 
output file 
                                // we are reusing this value in the 
SendMessage.java while sending the schema request "AM1S" message
                                
                                try {
                                        // Create file 
                                    FileWriter fstream = new 
FileWriter("./conf/info.prop");
                                    out = new BufferedWriter(fstream);
                                    out.write("schema  = ");
                                        out.write(schemaHash + "\n");
                                    //Close the output stream
                                    out.close();
                                }catch (Exception e){//Catch exception if any
                        
                                }
                                break;
        
                                
                        
                        case 's':
                        
                                /*
                                * if the response is "schema response"
                        *    +-----+-----+-----+-----+-----------------------+
                        | 'A' | 'M' | '1' | 's' |          seq          |
                        
+-----+-----+-----+-----+-----------------------+----------+
                        |                packageName (short string)             
   |
                        
+----------------------------------------------------------+
                        |                className (short string)               
   |
                        
+----------------------------------------------------------+
                        |                schema-hash (bin128)                   
   |
                        
+-----------+-----------+-----------+-----------+----------+
                        | propCnt   | statCnt   | methodCnt | eventCnt  |
                        
+-----------+-----------+-----------+-----------+----------------------------+
                        | propCnt property records                              
                     |
                        
+----------------------------------------------------------------------------+
                        | statCnt statistic records                             
                     |
                        
+----------------------------------------------------------------------------+
                        | methodCnt method records                              
                     |
                        
+----------------------------------------------------------------------------+
                        | eventCnt event records                                
                     |
                        
+----------------------------------------------------------------------------+
                                 */
                                String packname = decoder.readStr8();
                                System.out.println("Package Name: " + packname);
                                String clasnam = decoder.readStr8();
                                System.out.println("Class Name: " + clasnam);
                                
                                // get the decoded schema hash
                                decoder.readBin128();
                                // get the decoded properties contents
                                int propCnt = decoder.readUint16();
                                System.out.println("Property content:   " + 
propCnt);
                                long statCnt = decoder.readUint16();
                                System.out.println("Statistic content:   " + 
statCnt);
                                int methodCnt = decoder.readUint16();
                                System.out.println("Method content:   " + 
methodCnt);
                                int eventCnt = decoder.readUint16();
                                System.out.println("Event content:   " + 
eventCnt);
                                
                                for( int i = 0; i < propCnt; i++ )
                                {       // decode the MAP
                                        Map<String,Object> map = 
decoder.readMap();
                                        //printMap(map);
                                }
                                
                                for( int i = 0; i < statCnt; i++ )
                                {       // decode the MAP
                                        Map<String,Object> map = 
decoder.readMap();
                                        //printMap(map);
                                }
                                
                                for( int i = 0; i < methodCnt; i++ )
                                {       // decode the MAP
                                        Map<String,Object> map = 
decoder.readMap();
                                        //printMap(map);
                                }
                                
                                for( int i = 0; i < eventCnt; i++ )
                                {       // decode the MAP
                                        Map<String,Object> map = 
decoder.readMap();
                                        //printMap(map);
                                }
                                    
                                break;
                
                        case 'm':
                        System.out.println("I got it");
                        break;
                        
                        default:
                        break;
                        }
                }
                catch(Exception e)
                {
                        System.out.print("Error reading message");
                        e.printStackTrace();
                }
        }
        
        
        public static void main(String[] args)
        {
                // Create connection
                Connection con = Client.createConnection();
                try
                {       // connect to local host on default port 5672
                        con.connect("localhost", 5672, "test", "guest", 
"guest");
                }
                catch(Exception e)
                {
                        System.out.print("Error connecting to broker");
                        e.printStackTrace();
                }
                // Create session
                Session session = con.createSession(0);
                // Create an instance of the listener
                ListenerReply listener = new ListenerReply();
                // create a subscription with the "reply" queue
                session.messageSubscribe("reply",
                                "listener_reply",
                                Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
                                Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
                                new MessagePartListenerAdapter(listener), null);
                // controls the flow of message data to a given destination
                session.messageFlow("listener_reply", MessageCreditUnit.BYTE, 
Session.MESSAGE_FLOW_MAX_BYTES);
                // will get maximum of 11 messages
                session.messageFlow("listener_reply", 
MessageCreditUnit.MESSAGE, 15);
                // confirm completion
                session.sync();
                // This method cancels a consumer. The server will not send any 
more messages to the "listener_reply" destination.
                session.messageCancel("listener_reply");
                
                //cleanup
                session.sessionDetach(session.getName());
                try
                {       // close connection
                        con.close();
                }
                catch(Exception e)
                {
                        System.out.print("Error closing broker connection");
                        e.printStackTrace();
                }
        }

}

Reply via email to