Hi Arnaud,
Original Code is here which I attached in my previous email (I will update
it with your previous comments e.g. remove printMap method)
SendMessage.java:
package apache.qpid.client;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageAcceptMode;
import org.apache.qpidity.transport.MessageAcquireMode;
import org.apache.qpidity.transport.MessageProperties;
import org.apache.qpidity.transport.ReplyTo;
/*
* this sends the "Class Query" and "Schema Request" to the broker
*/
public class SendMessage
{ // AMQP type
String opcode;
/* int sequenceNo;
String packageName;
String schemaHash;
String classname;*/
public static void main(String[] args)
{
SendMessage p=new SendMessage();
// Create connection
Connection con = Client.createConnection();
ByteBuffer message= ByteBuffer.allocate(100);
try
{ // connect to local host on default port 5672
con.connect("localhost", 5672, "test", "guest", "guest");
}
catch(Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Create session
Session session = con.createSession(0);
DeliveryProperties deliveryProps = new DeliveryProperties();
// set the routing key as "agent"
deliveryProps.setRoutingKey("agent");
MessageProperties messageProps=new MessageProperties();
// set replyTo field so that messages return to the "reply" queue
ReplyTo rpt=new ReplyTo();
rpt.setExchange("amq.direct");
rpt.setRoutingKey("reply");
messageProps.setReplyTo(rpt);
// transfer message to "qpid.management" exchange
session.messageTransfer("qpid.management",
MessageAcceptMode.EXPLICIT,MessageAcquireMode.PRE_ACQUIRED);
session.header(deliveryProps,messageProps);
/*
* JUST FOR TEST
* Read input from keyboard
* if user enters 1 so it sends the "Class Query" or if
* enters 2 then it means client sends
* the "Schema Request"
*/
System.out.println("Enter an opertion ");
BufferedReader br = new BufferedReader(new InputStreamReader(System.in));
int i=0;
try {
i = Integer.parseInt(br.readLine().trim());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
switch (i)
{
/*
* if user enters 1 then a "Class Query" message will be
sent to the broker
*
* +-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'Q' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string) |
+----------------------------------------------------------+
*/
case 1:
p.opcode="AM1Q";
//p.sequenceNo=100; p.packageName="qpid"; p.classname="exchange";
/* * I am wondering I am sending only the type/opcode (e.g. AM1Q) and it
works * unlike what the format mentioned above (according to
management-design notes) *
http://cwiki.apache.org/qpid/management-design-notes.html
*/
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
break;
/*
* if user enters 2 then a "Schema request" message will be sent to the
broker
* +-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'S' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string) |
+----------------------------------------------------------+
| className (short string) |
+----------------------------------------------------------+
| schema-hash (bin128) |
+----------------------------------------------------------+
*/
case 2:
p.opcode="AM1S";
//p.sequenceNo=100; p.packageName="qpid"; p.classname="exchange";
/* * I am wondering I am only sending the type/opcode (e.g. AM1S) and it
works... * unlike what the format mentioned above (according to
management-design notes) *
http://cwiki.apache.org/qpid/management-design-notes.html */
message.clear();
try {
message.put(p.opcode.getBytes("UTF-8"));
} catch (UnsupportedEncodingException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
break;
default:
break;
}
message.flip();
// send message
session.data(message);
session.endData();
// confirm completion
session.sync();
//cleanup
session.sessionDetach(session.getName());
try
{ // clise connection
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker connection");
e.printStackTrace();
}
}
}
ListenerReply.java:
package apache.qpid.client;
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import org.apache.qpidity.api.Message;
import org.apache.qpidity.nclient.Client;
import org.apache.qpidity.nclient.Connection;
import org.apache.qpidity.nclient.Session;
import org.apache.qpidity.nclient.util.MessageListener;
import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
import org.apache.qpidity.transport.MessageCreditUnit;
/**
* This listen messages on reply queue and terminates
* when it sees the final message
*
*/
public class ListenerReply implements MessageListener
{
ByteBuffer buf;
public void onMessage(Message m)
{
try
{
buf = m.readData();
// get the Magic number "AM1"
char magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
magicNumber = (char)buf.get();
System.out.println(magicNumber);
// get the opcode
char opcode = (char)buf.get();
System.out.println(opcode);
// java decoder
ManagementDecoder decoder = new ManagementDecoder(buf);
// get the decoded sequence number
System.out.println(decoder.readSequenceNo());
switch (opcode) {
/*
* if the response is "query indication"
* +-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 'q' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| package name (short string) |
+----------------------------------------------------------+
| class name (short string) |
+----------------------------------------------------------+
| schema hash (bin128) |
+----------------------------------------------------------+
*/
case 'q':
// decode the package name
String packagename = decoder.readStr8();
// decode the class name
String classname = decoder.readStr8();
System.out.println("Package Name: " + packagename);
System.out.println("Class Name: " + classname);
// decode the schema hash // TODO: this
prints the hexa value of schema... need to be printed in readable
System.out.println("Schema hash is: " +
org.apache.qpidity.transport.util.Functions.str(decoder.readBin128()));
//System.out.println("Schema hash is: " + decoder.readUint64() +
decoder.readUint64()); break;
/*
* if the response is "schema response"
* +-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 's' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string) |
+----------------------------------------------------------+
| className (short string) |
+----------------------------------------------------------+
| schema-hash (bin128) |
+-----------+-----------+-----------+-----------+----------+
| propCnt | statCnt | methodCnt | eventCnt |
+-----------+-----------+-----------+-----------+----------------------------+
| propCnt property records |
+----------------------------------------------------------------------------+
| statCnt statistic records |
+----------------------------------------------------------------------------+
| methodCnt method records |
+----------------------------------------------------------------------------+
| eventCnt event records |
+----------------------------------------------------------------------------+
*/
case 's':
String packname = decoder.readStr8();
String clasnam = decoder.readStr8();
System.out.println("Package Name: " + packname);
System.out.println("Class Name: " + clasnam); //
Added new method i.e. readBin128() in AbstractDecoder.java (under
org.apache.qpidity.transport.codec) for decoding 16 bytes of opaque binary
data // TODO: this prints the hexa value of schema... need to be printed in
readable System.out.println("Schema is: " +
org.apache.qpidity.transport.util.Functions.str(decoder.readBin128()));
// get the decoded properties contents
int propCnt = decoder.readUint16();
System.out.println("Property content: " + propCnt);
for( int i = 0; i < propCnt; i++ )
{ // decode the MAP
// FIXME: getting exception here "java.lang.IllegalArgumentException:
// unknown code: 6"
Map<String,Object> map = decoder.readMap();
printMap(map);
}
// TODO: Also do the same for statistic records, method records,
// event records
break;
default:
break;
}
}
catch(Exception e)
{
System.out.print("Error reading message");
e.printStackTrace();
}
}
/*
* print the decoded map
* TODO: fix the warning and comment it when it works
*/
public void printMap(Map mapnew)
{
Set s=mapnew.entrySet();
Iterator it=s.iterator();
for (Iterator iterator = s.iterator(); iterator.hasNext();)
{
Object object = (Object) iterator.next();
System.out.println(object.toString());
}
}
public static void main(String[] args)
{
// Create connection
Connection con = Client.createConnection();
try
{ // connect to local host on default port 5672
con.connect("localhost", 5672, "test", "guest", "guest");
}
catch(Exception e)
{
System.out.print("Error connecting to broker");
e.printStackTrace();
}
// Create session
Session session = con.createSession(0);
// Create an instance of the listener
ListenerReply listener = new ListenerReply();
// create a subscription with the "reply" queue
session.messageSubscribe("reply",
"listener_reply",
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
new MessagePartListenerAdapter(listener), null);
// controls the flow of message data to a given destination
session.messageFlow("listener_reply",
MessageCreditUnit.BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
// will get maximum of 11 messages
session.messageFlow("listener_reply",
MessageCreditUnit.MESSAGE, 11);
// confirm completion
session.sync();
// This method cancels a consumer. The server will not
send any more messages to the "listener_reply" destination.
session.messageCancel("listener_reply");
//cleanup
session.sessionDetach(session.getName());
try
{ // close connection
con.close();
}
catch(Exception e)
{
System.out.print("Error closing broker connection");
e.printStackTrace();
}
}
}
-------
Best Regards,
Rahul
On Jun 27 2008, Arnaud Simon wrote:
Hi,
> Rafael mentioned last night that AbastractDecoder does not know it so
> we added another method i.e. readBin128() in AbstractDecoder.java
> (under org.apache.qpidity.transport.codec) for decoding 16 bytes of
> opaque binary data. It produced some unknown characters (e.g. output:
> Schema hash is:
> �\�W�p�`؞??V�?) as schema hash
> is an md5 digest and md5 hashes are binary data so in this case I was
> converting to a string like this System.out.println("Schema hash is " +
> new String(decoder.readBin128()));
>
> So I converted this non printable characters to hex (with Rafael's
> advice just to get rid off these some unknown character) by using
> org.apache.qpidity.transport.util.Functions.str(decoder.readBin128())
> method. so the schema hash printed as (Schema is:
> "\x85\x5c\xea\x96W\xe3p\xcc`\xd8\x9e\x1a\x08V\xa4\x0e")...
>
> 1) I want to see the Schema hash in the correct way ? How would I
> print it?
I don't think you want to print the hash, you only need it for sending
further requests. So, the hash is only 16 bytes of opaque binary, that's
all.
> -----
>
> As i was ignoring what i was getting for "schema hash" field (hax
> value) and moving on to next field... i.e. propCnt (properties
> content), i am doing this
> int propCnt = decoder.readUint16();
> System.out.println("Property content: " + propCnt);
>
> for( int i = 0; i < propCnt; i++ )
> {
> Map<String,Object> map = decoder.readMap();
> printMap(map);
> }
> public void printMap(Map mapnew)
> {
> Set s=mapnew.entrySet();
> Iterator it=s.iterator();
> for (Iterator iterator = s.iterator(); iterator.hasNext();) {
> Object object = (Object) iterator.next();
> System.out.println(object.toString());
>
> }
>
> BUT, getting an exception here
> A
> M
> 1
> s
> 15
> Package Name: qpid
> Class Name: vhost
> Schema is: "\x85\x5c\xea\x96W\xe3p\xcc`\xd8\x9e\x1a\x08V\xa4\x0e"
> Property content: 2
> code is : 6
>
> Error reading messagejava.lang.IllegalArgumentException: unknown code: 6
> at
> org.apache.qpidity.transport.codec.AbstractDecoder.getType(AbstractDecoder.java:302)
> at
> org.apache.qpidity.transport.codec.AbstractDecoder.readMap(AbstractDecoder.java:253)
> at apache.qpid.client.ListenerReply.onMessage(ListenerReply.java:109)
> at
> org.apache.qpidity.nclient.util.MessagePartListenerAdapter.messageReceived(MessagePartListenerAdapter.java:56)
> at
> org.apache.qpidity.nclient.impl.ClientSessionDelegate.data(ClientSessionDelegate.java:41)
> at
> org.apache.qpidity.transport.SessionDelegate.data(SessionDelegate.java:1)
> at org.apache.qpidity.transport.Data.delegate(Data.java:78)
> at org.apache.qpidity.transport.Channel.data(Channel.java:114)
> at org.apache.qpidity.transport.Channel.data(Channel.java:1)
> at org.apache.qpidity.transport.Data.delegate(Data.java:78)
> at org.apache.qpidity.transport.Channel.received(Channel.java:75)
> at org.apache.qpidity.transport.Connection.received(Connection.java:84)
> at org.apache.qpidity.transport.Connection.received(Connection.java:1)
> at
> org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:92)
> at
> org.apache.qpidity.transport.network.Assembler.emit(Assembler.java:97)
> at
> org.apache.qpidity.transport.network.Assembler.frame(Assembler.java:125)
> at org.apache.qpidity.transport.network.Frame.delegate(Frame.java:145)
> at
> org.apache.qpidity.transport.network.Assembler.received(Assembler.java:102)
> at
> org.apache.qpidity.transport.network.Assembler.received(Assembler.java:1)
> at
> org.apache.qpidity.transport.network.InputHandler.frame(InputHandler.java:103)
> at
> org.apache.qpidity.transport.network.InputHandler.next(InputHandler.java:204)
> at
> org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:116)
> at
> org.apache.qpidity.transport.network.InputHandler.received(InputHandler.java:1)
> at
> org.apache.qpidity.transport.network.mina.MinaHandler.messageReceived(MinaHandler.java:87)
> at
> org.apache.mina.common.support.AbstractIoFilterChain$TailFilter.messageReceived(AbstractIoFilterChain.java:703)
> at
> org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362)
> at
> org.apache.mina.common.support.AbstractIoFilterChain.access$1200(AbstractIoFilterChain.java:54)
> at
> org.apache.mina.common.support.AbstractIoFilterChain$EntryImpl$1.messageReceived(AbstractIoFilterChain.java:800)
> at
> org.apache.mina.common.support.AbstractIoFilterChain$HeadFilter.messageReceived(AbstractIoFilterChain.java:617)
> at
> org.apache.mina.common.support.AbstractIoFilterChain.callNextMessageReceived(AbstractIoFilterChain.java:362)
> at
> org.apache.mina.common.support.AbstractIoFilterChain.fireMessageReceived(AbstractIoFilterChain.java:353)
> at
> org.apache.mina.transport.socket.nio.SocketIoProcessor.read(SocketIoProcessor.java:281)
> at
> org.apache.mina.transport.socket.nio.SocketIoProcessor.process(SocketIoProcessor.java:241)
> at
> org.apache.mina.transport.socket.nio.SocketIoProcessor.access$500(SocketIoProcessor.java:44)
> at
> org.apache.mina.transport.socket.nio.SocketIoProcessor$Worker.run(SocketIoProcessor.java:559)
> at
> org.apache.mina.util.NamePreservingRunnable.run(NamePreservingRunnable.java:43)
> at java.lang.Thread.run(Thread.java:619)
>
>
> I have just an idea why it is raising but how to mitigate it I dont
> know.
>
> 2) How to fix it? Can anyone help me?
Looking at http://cwiki.apache.org/qpid/management-design-notes.html,
the message format is as follows:
+-----+-----+-----+-----+-----------------------+
| 'A' | 'M' | '1' | 's' | seq |
+-----+-----+-----+-----+-----------------------+----------+
| packageName (short string) |
+----------------------------------------------------------+
| className (short string) |
+----------------------------------------------------------+
| schema-hash (bin128) |
+-----------+-----------+-----------+-----------+----------+
| propCnt | statCnt | methodCnt | eventCnt |
+-----------+-----------+-----------+-----------+----------------------------+
| propCnt property records |
+----------------------------------------------------------------------------+
| statCnt statistic records |
+----------------------------------------------------------------------------+
| methodCnt method records |
+----------------------------------------------------------------------------+
| eventCnt event records |
+----------------------------------------------------------------------------+
This means that you need to read propCnt, statCnt, methodCnt and then
eventCnt before you can read propCnt maps. statCnt maps, methodCnt maps
and then eventCnt maps. You are currently not reading statCnt, methodCnt
and eventCnt.
Arnaud