Robert Godfrey wrote:
I think to avoid wasted work, it would be useful before making a
traight implementation of the C++ management API over JMS if we could
first document the commonalities and differences between the two
interfaces (i.e. the existing Java Broker JMS interface and the
existing C++ AMQP interface)
It would help a lot if we could bring them as close as possible together
Attached is some very rough data (taken from MBean defs and schema xml)
to help start that process. They aren't yet in the same form I'm afraid,
I've just tried to cut out some noise to get a quick overview.
In terms of the object classes, theres quite a lot of similarity (java
has a user management object that c++ does not; c++ models producers and
consumers distinctly which I don't think java does(?)). The properties
and methods exposed are along similar lines but do vary a fair bit
between the two in naming etc.
One of the most obvious differences to my eye is that java allows more
access to (and manipulation of) the messages on a queue.
Other observations?
Access rights for configuration elements:
RO => Read Only
RC => Read/Create, can be set at create time only, read-only
thereafter
RW => Read/Write
If access rights are omitted for a configElement, they are assumed
to be RO.
System
<configElement name="sysId" index="y" type="uuid" access="RC"/>
<configElement name="osName" type="sstr" access="RO" desc="Operating
System Name"/>
<configElement name="nodeName" type="sstr" access="RO" desc="Node Name"/>
<configElement name="release" type="sstr" access="RO"/>
<configElement name="version" type="sstr" access="RO"/>
<configElement name="machine" type="sstr" access="RO"/>
Broker
<configElement name="systemRef" type="objId" access="RC"
index="y" desc="System ID" parentRef="y"/>
<configElement name="port" type="uint16" access="RC"
index="y" desc="TCP Port for AMQP Service"/>
<configElement name="workerThreads" type="uint16" access="RO"
desc="Thread pool size"/>
<configElement name="maxConns" type="uint16" access="RO"
desc="Maximum allowed connections"/>
<configElement name="connBacklog" type="uint16" access="RO"
desc="Connection backlog limit for listening socket"/>
<configElement name="stagingThreshold" type="uint32" access="RO"
desc="Broker stages messages over this size to disk"/>
<configElement name="mgmtPubInterval" type="uint16" access="RW"
unit="second" min="1" desc="Interval for management broadcasts"/>
<configElement name="clusterName" type="sstr" access="RO"
desc="Name of cluster this server is a member of"/>
<configElement name="version" type="sstr" access="RO"
desc="Running software version"/>
<configElement name="dataDirEnabled" type="bool" access="RO"
desc="Persistent configuration storage enabled"/>
<configElement name="dataDir" type="sstr" access="RO"
desc="Persistent configuration storage location"/>
<method name="joinCluster">
<arg name="clusterName" dir="I" type="sstr"/>
</method>
<method name="leaveCluster"/>
<method name="echo" desc="Request a response to test the path to the
management agent">
<arg name="sequence" dir="IO" type="uint32" default="0"/>
<arg name="body" dir="IO" type="lstr" default=""/>
</method>
<method name="connect" desc="Establish a connection to another broker">
<arg name="host" dir="I" type="sstr" default=""/>
<arg name="port" dir="I" type="uint32" default="0"/>
</method>
Virtual Host
<configElement name="brokerRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
Queue
<configElement name="vhostRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
<configElement name="durable" type="bool" access="RC"/>
<configElement name="autoDelete" type="bool" access="RC"/>
<configElement name="exclusive" type="bool" access="RC"/>
<configElement name="arguments" type="ftable" access="RO"
desc="Arguments supplied in queue.declare"/>
<configElement name="storeRef" type="objId" access="RO"
desc="Reference to persistent queue (if durable)"/>
<instElement name="msgTotalEnqueues" type="count64" unit="message"
desc="Total messages enqueued"/>
<instElement name="msgTotalDequeues" type="count64" unit="message"
desc="Total messages dequeued"/>
<instElement name="msgTxnEnqueues" type="count64" unit="message"
desc="Transactional messages enqueued"/>
<instElement name="msgTxnDequeues" type="count64" unit="message"
desc="Transactional messages dequeued"/>
<instElement name="msgPersistEnqueues" type="count64" unit="message"
desc="Persistent messages enqueued"/>
<instElement name="msgPersistDequeues" type="count64" unit="message"
desc="Persistent messages dequeued"/>
<instElement name="msgDepth" type="hilo32" unit="message"
desc="Current size of queue in messages"/>
<instElement name="byteTotalEnqueues" type="count64" unit="octet"
desc="Total messages enqueued"/>
<instElement name="byteTotalDequeues" type="count64" unit="octet"
desc="Total messages dequeued"/>
<instElement name="byteTxnEnqueues" type="count64" unit="octet"
desc="Transactional messages enqueued"/>
<instElement name="byteTxnDequeues" type="count64" unit="octet"
desc="Transactional messages dequeued"/>
<instElement name="bytePersistEnqueues" type="count64" unit="octet"
desc="Persistent messages enqueued"/>
<instElement name="bytePersistDequeues" type="count64" unit="octet"
desc="Persistent messages dequeued"/>
<instElement name="byteDepth" type="hilo32" unit="octet"
desc="Current size of queue in bytes"/>
<instElement name="enqueueTxnStarts" type="count64" unit="transaction"
desc="Total enqueue transactions started "/>
<instElement name="enqueueTxnCommits" type="count64" unit="transaction"
desc="Total enqueue transactions committed"/>
<instElement name="enqueueTxnRejects" type="count64" unit="transaction"
desc="Total enqueue transactions rejected"/>
<instElement name="enqueueTxnCount" type="hilo32" unit="transaction"
desc="Current pending enqueue transactions"/>
<instElement name="dequeueTxnStarts" type="count64" unit="transaction"
desc="Total dequeue transactions started"/>
<instElement name="dequeueTxnCommits" type="count64" unit="transaction"
desc="Total dequeue transactions committed"/>
<instElement name="dequeueTxnRejects" type="count64" unit="transaction"
desc="Total dequeue transactions rejected"/>
<instElement name="dequeueTxnCount" type="hilo32" unit="transaction"
desc="Current pending dequeue transactions"/>
<instElement name="consumers" type="hilo32" unit="consumer"
desc="Current consumers on queue"/>
<instElement name="bindings" type="hilo32" unit="binding"
desc="Current bindings"/>
<instElement name="unackedMessages" type="hilo32" unit="message"
desc="Messages consumed but not yet acked"/>
<instElement name="messageLatency" type="mmaTime" unit="nanosecond"
desc="Broker latency through this queue"/>
<method name="purge" desc="Discard all messages on queue"/>
Exchange
<configElement name="vhostRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
<configElement name="type" type="sstr" access="RO"/>
<instElement name="producers" type="hilo32" desc="Current producers on
exchange"/>
<instElement name="bindings" type="hilo32" desc="Current bindings"/>
<instElement name="msgReceives" type="count64" desc="Total messages
received"/>
<instElement name="msgDrops" type="count64" desc="Total messages
dropped (no matching key)"/>
<instElement name="msgRoutes" type="count64" desc="Total routed
messages"/>
<instElement name="byteReceives" type="count64" desc="Total bytes
received"/>
<instElement name="byteDrops" type="count64" desc="Total bytes dropped
(no matching key)"/>
<instElement name="byteRoutes" type="count64" desc="Total routed bytes"/>
Binding
<configElement name="exchangeRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="queueRef" type="objId" access="RC" index="y"/>
<configElement name="bindingKey" type="sstr" access="RC" index="y"/>
<configElement name="arguments" type="ftable" access="RC"/>
<instElement name="msgMatched" type="count64"/>
Client (i.e. Connection)
<configElement name="vhostRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="address" type="sstr" access="RC" index="y"/>
<instElement name="closing" type="bool" desc="This client is
closing by management request"/>
<instElement name="authIdentity" type="sstr"/>
<instElement name="framesFromClient" type="count64"/>
<instElement name="framesToClient" type="count64"/>
<instElement name="bytesFromClient" type="count64"/>
<instElement name="bytesToClient" type="count64"/>
<method name="close"/>
Session
<configElement name="vhostRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
<configElement name="channelId" type="uint16" access="RO"/>
<configElement name="clientRef" type="objId" access="RO"/>
<configElement name="detachedLifespan" type="uint32" access="RO"
unit="second"/>
<instElement name="attached" type="bool"/>
<instElement name="expireTime" type="absTime"/>
<instElement name="framesOutstanding" type="count32"/>
<method name="solicitAck"/>
<method name="detach"/>
<method name="resetLifespan"/>
<method name="close"/>
Destination
<configElement name="sessionRef" type="objId" access="RC" index="y"
parentRef="y"/>
<configElement name="name" type="sstr" access="RC" index="y"/>
<instElement name="flowMode" type="uint8"/>
<instElement name="maxMsgCredits" type="uint32"/>
<instElement name="maxByteCredits" type="uint32"/>
<instElement name="msgCredits" type="uint32"/>
<instElement name="byteCredits" type="uint32"/>
<method name="throttle" desc="Apply extra rate limiting to destination: 0 =
Normal, 10 = Maximum">
<arg name="strength" type="uint8" dir="I" min="0" max="10"/>
</method>
<method name="stop"/>
<method name="start"/>
Producer
<configElement name="destinationRef" access="RC" type="objId" index="y"/>
<configElement name="exchangeRef" access="RC" type="objId" index="y"/>
<instElement name="msgsProduced" type="count64"/>
<instElement name="bytesProduced" type="count64"/>
Consumer
<configElement name="destinationRef" access="RC" type="objId" index="y"/>
<configElement name="queueRef" access="RC" type="objId" index="y"/>
<instElement name="msgsConsumed" type="count64"/>
<instElement name="bytesConsumed" type="count64"/>
<instElement name="unackedMessages" type="hilo32" desc="Messages consumed
but not yet acked"/>
<method name="close"/>
Broker
public void createNewExchange(java.lang.String, java.lang.String, boolean);
public void unregisterExchange(java.lang.String);
public void createNewQueue(java.lang.String, java.lang.String, boolean);
public void deleteQueue(java.lang.String);
VirtualHost
public java.lang.String getName();
UserManagement
public boolean setPassword(java.lang.String, char[]);
public boolean setRights(java.lang.String, boolean, boolean, boolean);
public boolean createUser(java.lang.String, char[], boolean, boolean,
boolean);
public boolean deleteUser(java.lang.String);
public boolean reloadData();
public javax.management.openmbean.TabularData viewUsers();
public void
setPrincipalDatabase(org.apache.qpid.server.security.auth.database.PrincipalDatabase);
public void setAccessFile(java.lang.String);
Queue
public java.lang.String getName();
public boolean isDurable();
public java.lang.String getOwner();
public boolean isAutoDelete();
public java.lang.Integer getMessageCount();
public java.lang.Long getMaximumMessageSize();
public void setMaximumMessageSize(java.lang.Long);
public java.lang.Long getMaximumMessageAge();
public void setMaximumMessageAge(java.lang.Long);
public java.lang.Integer getConsumerCount();
public java.lang.Integer getActiveConsumerCount();
public java.lang.Long getReceivedMessageCount();
public java.lang.Long getMaximumMessageCount();
public void setMaximumMessageCount(java.lang.Long);
public java.lang.Long getMaximumQueueDepth();
public void setMaximumQueueDepth(java.lang.Long);
public java.lang.Long getQueueDepth();
public void checkForNotification(org.apache.qpid.server.queue.AMQMessage);
public void notifyClients(org.apache.qpid.server.queue.NotificationCheck,
org.apache.qpid.server.queue.AMQQueue, java.lang.String);
public javax.management.Notification getLastNotification();
public void deleteMessageFromTop();
public void clearQueue();
public javax.management.openmbean.CompositeData viewMessageContent(long);
public javax.management.openmbean.TabularData viewMessages(int, int);
public void moveMessages(long, long, java.lang.String);
public javax.management.MBeanNotificationInfo[] getNotificationInfo();
AbstractExchange
public java.lang.String getName();
public java.lang.String getExchangeType();
public boolean isDurable();
public boolean isAutoDelete();
DestNameExchange (i.e. direct exchange)
public javax.management.openmbean.TabularData bindings();
public void createNewBinding(java.lang.String, java.lang.String);
DestWildExchange (i.e. topic exchange)
public javax.management.openmbean.TabularData bindings();
public void createNewBinding(java.lang.String, java.lang.String);
FanoutExchange
public javax.management.openmbean.TabularData bindings();
public void createNewBinding(java.lang.String, java.lang.String);
HeadersExchange
public javax.management.openmbean.TabularData bindings();
public void createNewBinding(java.lang.String, java.lang.String);
Client Connection (called Session)
public java.lang.String getClientId();
public java.lang.String getAuthorizedId();
public java.lang.String getVersion();
public java.util.Date getLastIoTime();
public java.lang.String getRemoteAddress();
public java.lang.Long getWrittenBytes();
public java.lang.Long getReadBytes();
public java.lang.Long getMaximumNumberOfChannels();
public void setMaximumNumberOfChannels(java.lang.Long);
public void commitTransactions(int);
public void rollbackTransactions(int);
public javax.management.openmbean.TabularData channels();
public void closeConnection();
public javax.management.MBeanNotificationInfo[] getNotificationInfo();
public void notifyClients(java.lang.String);