Hi Arnaud,
I hope you are well.
Apropos of our discussion on the IRC last evening, I am attaching the code
(for keeping the format) with this email for your consideration.
I attached three java files here:
1. Queue.java - to create two queues on the broker
2. SendMessage.java - sends the request to the broker
3. ListenerReply.java - listen and retrieve the responses from the broker
Please check the case 4: in SendMessage (where we request the schema from
the broker)... because when I do this then ListenerReply.java (case 's':)is
not able to get the schema.
-> Am I sending the schema request in correct format? OR am I retriving from
the broker in a correct way? Please help me here.
-> Also please let me know how to invoke the method? What is this field
"input and bidirectional argument values (in schema order)" in AM1M (Method
request format).
I look forward to hearing from you.
Best Regards,
Rahul
package apache.qpid.client;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
/*
* @author [EMAIL PROTECTED]
*
* this program
* -> connects with the broker on the default port 5672 with the user &
password "guest"
* -> creates two queues in the broker
* 1. "management"
* 2. "reply"
* -> bind these two queues to the exchanges
* 1. "management" queue is bounded on the "qpid.management" exchange with the
routing key "mgmt.#", and
* 2. "reply" queue is bounded on the "amq.direct" exchange with the routing
key "reply"
* -> Broker sends all the management information on the "management" queue
*/
public class Queue {
public static void main(String args[])
{
// Create connection
Connection con = Client.createConnection();
try
{ // connect with the broker on the default port 5672
con.connect("localhost", 5672, "test", "guest",
"guest");
}
catch (Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Create a session
Session session = con.createSession(0);
// declare and bind (with "qpid.management" exchange) the
management queue
session.queueDeclare("management", null, null);
session.exchangeBind("management", "qpid.management", "mgmt.#",
null);
// confirm completion
session.sync();
// declare and bind (with "amq.direct" exchange) the reply queue
session.queueDeclare("reply", null, null);
session.exchangeBind("reply", "amq.direct", "reply", null);
// confirm completion
session.sync();
//cleanup
session.sessionDetach(session.getName());
try
{
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker connection");
e.printStackTrace();
}
}
}
package apache.qpid.client;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Properties;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageAcceptMode;
import org.apache.qpidity.transport.MessageAcquireMode;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.ReplyTo;
import org.apache.qpidity.transport.codec.Encoder;
/*
* this sends the "Class Query" and "Schema Request" to the broker
*/
public class SendMessage
{ // AMQP type
String opcode;
int sequenceNo;
String packageName;
String classname;
//byte[] schemaHash=new byte[16];
static byte[] schemaHash;
// this file contains only schema for "vhost" class at run time
private String infoFile="./conf/info.prop";
/*
* ******this is just for testing purpose******
* get the schema for "vhost" class as ListenerReply.java program writes
* the schema for "vhost" in this file (info.prop) when it process the
AM1Q request and
* gives all 14 classes with the schema hash.. then this program writes
the schema hash value for VHOST in
* this file...
*
* Contents of info.prop :
* ------
* schema = [EMAIL PROTECTED]
* ------
* We are using this value while requesting AM1S as the format is:
* opcode = AM1S
* Package name= qpid
* Class name= vhost
* Schema Hash= [EMAIL PROTECTED]
*
*/
public void getSchemaHash() throws FileNotFoundException, IOException
{
Properties prop = new Properties();
prop.load(new FileInputStream(new File(infoFile)));
// getting [EMAIL PROTECTED] into schemaHash
schemaHash = prop.getProperty("schema").getBytes();
}
public static void main(String[] args)
{
SendMessage p=new SendMessage();
// Create connection
Connection con = Client.createConnection();
ByteBuffer message= ByteBuffer.allocate(500);
ManagementEncoder encoder=new ManagementEncoder(message);
try
{ // connect to local host on default port 5672
con.connect("localhost", 5672, "test", "guest",
"guest");
}
catch(Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Create session
Session session = con.createSession(0);
DeliveryProperties deliveryProps = new DeliveryProperties();
// set the routing key as "agent"
deliveryProps.setRoutingKey("agent");
MessageProperties messageProps=new MessageProperties();
// set replyTo field so that messages return to the "reply"
queue
ReplyTo rpt=new ReplyTo();
rpt.setExchange("amq.direct");
rpt.setRoutingKey("reply");
messageProps.setReplyTo(rpt);
// transfer message to "qpid.management" exchange
session.messageTransfer("qpid.management",
MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
session.header(deliveryProps,messageProps);
try {
System.out.println();
System.out.println("1. Broker Query Request");
System.out.println("2. Package Query Request");
System.out.println("3. Class Query Request");
System.out.println("4. Schema Query Request");
System.out.println("5. Method Request");
System.out.println("0. Exit");
System.out.println();
System.out.println("Please Enter your choice for
REQUEST");
BufferedReader br = new BufferedReader(new
InputStreamReader(System.in));
int choice=0;
choice = Integer.parseInt(br.readLine().trim());
switch (choice)
{
// if press 0 then Exit
case 0:
System.exit(0);
break;
case 1:
/*
* if user enters 1 then a "Broker Query"
message will be sent to the broker
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'B' | 0
|
+-----+-----+-----+-----+-----------------------+
The Broker Request message has
no payload.
*/
p.opcode="AM1B";
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
}
System.out.println("Query Broker Sent...");
break;
case 2:
/*
* if user enters 2 then a "Package Query"
message will be sent to the broker
*
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'P' |
seq |
+-----+-----+-----+-----+-----------------------+
*
*/
p.opcode="AM1P";
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
}
System.out.println("Query Package Sent...");
break;
case 3:
/*
* if user enters 3 then a "Class Query"
message will be sent to the broker
*
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'Q' | seq
|
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
*/
p.opcode="AM1Q";
p.sequenceNo=600;
p.packageName="qpid";
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
message.putInt( p.sequenceNo);
encoder.writeStr8(p.packageName);
message.putShort((byte)
p.packageName.length());
message.put(p.packageName.getBytes("UTF-8"));
//message.put(p.packageName.getBytes());
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
}
System.out.println("Query Class Sent...");
break;
/* *****NEED TO CHECK WITH Rafael H. Schloming
*****
* Added new method i.e. writeBin128() in
AbstractEncoder.java
* (under org.apache.qpidity.transport.codec)
for encoding 16 bytes of opaque binary data
*
* public byte[] writeBin128(byte[] s) {
* byte[] result = new byte[16];
* result=s;
* put(result);
* return result;
* }
*/
case 4:
/*
* if user enters 4 then a "Schema request"
message will be sent to the broker
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'S' | seq
|
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string)
|
+----------------------------------------------------------+
| className (short string)
|
+----------------------------------------------------------+
| schema-hash (bin128)
|
+----------------------------------------------------------+
*/
// get the schema hash for VHOST
p.getSchemaHash();
p.opcode="AM1S";
p.sequenceNo=500;
p.packageName="qpid";
p.classname="vhost";
message.clear();
try {
// encode the opcode
message.put(p.opcode.getBytes("UTF-8"));
message.putInt( p.sequenceNo);
// encode the package name
encoder.writeStr8(p.packageName);
message.putShort((byte)
p.packageName.length());
message.put(p.packageName.getBytes("UTF-8"));
// encode the class name
encoder.writeStr8(p.classname);
message.putShort((byte)
p.classname.length());
message.put(p.classname.getBytes("UTF-8"));
// encode the schema hash
// schemaHash data member contains the
schema hash for the "vhost" (fetching from the file)
encoder.writeBin128(schemaHash);
message.put((byte) schemaHash.length);
message.put(schemaHash);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
System.out.println("Schema Request Sent...");
break;
case 5:
// TODO: DOES NOT WORK
p.opcode="AM1M";
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e1) {
e1.printStackTrace();
}
System.out.println("Method Request Sent...");
break;
default:
System.out.println("Invalid number.");
break;
}
message.flip();
// send message
session.data(message);
session.endData();
// confirm completion
session.sync();
//cleanup
session.sessionDetach(session.getName());
try
{ // close connection
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker
connection");
e.printStackTrace();
}
} catch (Exception e) {
e.printStackTrace();
}
}
}package apache.qpid.client;
import java.io.BufferedWriter;
import java.io.FileWriter;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.MessageCreditUnit;
/**
* This listen messages on reply queue and terminates
* when it sees the final message
*
*/
public class ListenerReply implements MessageListener
{
ByteBuffer buf;
public void onMessage(Message m)
{
try
{
buf = m.readData();
// get the Magic number "AM1"
char magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
// get the opcode
char opcode = (char)buf.get();
System.out.println(opcode);
System.out.println("I am here");
// java decoder
ManagementDecoder decoder = new ManagementDecoder(buf);
// get the decoded sequence number
System.out.println("Sequence No: " +
decoder.readSequenceNo());
switch (opcode) {
case 'b':
/*
* Broker Response Message
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'b' | 0
|
+-----+-----+-----+-----+-----------------------+----------------------------+
| brokerId (uuid)
|
+----------------------------------------------------------------------------+
*/
// decode the package name
UUID brokerID = decoder.readUuid();
System.out.println("Broker Identifier: " +
brokerID);
break;
case 'p':
/*
* Package Indication Message
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'p' | seq
|
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
*/
// decode the package name
String packagenm = decoder.readStr8();
System.out.println("Package Name: " +
packagenm);
break;
/* Added new method i.e. readBin128() in
AbstractDecoder.java
* (under org.apache.qpidity.transport.codec)
for decoding 16 bytes of opaque binary data
*
* public byte[] readBin128() {
* byte[] result = new byte[16];
* get(result);
* return result;
* }
*/
case 'q':
/*
* if the response is "query indication"
*
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'q' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string)
|
+----------------------------------------------------------+
| class name (short string)
|
+----------------------------------------------------------+
| schema hash (bin128)
|
+----------------------------------------------------------+
*/
// decode the package name
String packagename = decoder.readStr8();
// decode the class name
String classname = decoder.readStr8();
System.out.println("Package Name: " +
packagename);
System.out.println("Class Name: " + classname);
// decode the schema hash (***Added new method
i.e. readBin128() in AbstractDecoder.java***)
byte[] schemaHash=decoder.readBin128();
System.out.println("schema is " + schemaHash);
BufferedWriter out = null;
// writing the schema hash for VHOST in this
output file
// we are reusing this value in the
SendMessage.java while sending the schema request "AM1S" message
try {
// Create file
FileWriter fstream = new
FileWriter("./conf/info.prop");
out = new BufferedWriter(fstream);
out.write("schema = ");
out.write(schemaHash + "\n");
//Close the output stream
out.close();
}catch (Exception e){//Catch exception if any
}
break;
case 's':
/*
* if the response is "schema response"
* +-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 's' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string)
|
+----------------------------------------------------------+
| className (short string)
|
+----------------------------------------------------------+
| schema-hash (bin128)
|
+-----------+-----------+-----------+-----------+----------+
| propCnt | statCnt | methodCnt | eventCnt |
+-----------+-----------+-----------+-----------+----------------------------+
| propCnt property records
|
+----------------------------------------------------------------------------+
| statCnt statistic records
|
+----------------------------------------------------------------------------+
| methodCnt method records
|
+----------------------------------------------------------------------------+
| eventCnt event records
|
+----------------------------------------------------------------------------+
*/
String packname = decoder.readStr8();
System.out.println("Package Name: " + packname);
String clasnam = decoder.readStr8();
System.out.println("Class Name: " + clasnam);
// get the decoded schema hash
decoder.readBin128();
// get the decoded properties contents
int propCnt = decoder.readUint16();
System.out.println("Property content: " +
propCnt);
long statCnt = decoder.readUint16();
System.out.println("Statistic content: " +
statCnt);
int methodCnt = decoder.readUint16();
System.out.println("Method content: " +
methodCnt);
int eventCnt = decoder.readUint16();
System.out.println("Event content: " +
eventCnt);
for( int i = 0; i < propCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
//printMap(map);
}
for( int i = 0; i < statCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
//printMap(map);
}
for( int i = 0; i < methodCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
//printMap(map);
}
for( int i = 0; i < eventCnt; i++ )
{ // decode the MAP
Map<String,Object> map =
decoder.readMap();
//printMap(map);
}
break;
case 'm':
System.out.println("I got it");
break;
default:
break;
}
}
catch(Exception e)
{
System.out.print("Error reading message");
e.printStackTrace();
}
}
public static void main(String[] args)
{
// Create connection
Connection con = Client.createConnection();
try
{ // connect to local host on default port 5672
con.connect("localhost", 5672, "test", "guest",
"guest");
}
catch(Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
ListenerReply listener = new ListenerReply();
// create a subscription with the "reply" queue
session.messageSubscribe("reply",
"listener_reply",
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new MessagePartListenerAdapter(listener), null);
// controls the flow of message data to a given destination
session.messageFlow("listener_reply", MessageCreditUnit.BYTE,
Session.MESSAGE_FLOW_MAX_BYTES);
// will get maximum of 11 messages
session.messageFlow("listener_reply",
MessageCreditUnit.MESSAGE, 15);
// confirm completion
session.sync();
// This method cancels a consumer. The server will not send any
more messages to the "listener_reply" destination.
session.messageCancel("listener_reply");
//cleanup
session.sessionDetach(session.getName());
try
{ // close connection
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker connection");
e.printStackTrace();
}
}
}