Hello all,
Over the last couple of weekends I decided to make a concerted effort to try and get my head around the Qpid AMQP 1.0 support and to see if I can replicate a whole bunch of use cases that I was able to support in AMQP 0.10.

I've attached a write up of my experiments along with some observations that I made along the way. Be warned the attachment is *very long*.

Hopefully this will be useful to others who want to dip their toes in the AMQP 1.0 waters. One of the issues that I was faced with was the lack of solid and cohesive documentation for any of the AMQP 1.0 Address/Subscription information, hopefully the examples that I've included in the write up can serve as a starting point.

I'd be really grateful if those who know this stuff better than me can read through this and check if my observations are accurate, I think that I might have discovered some quirks/defects/inconsistencies along the way though I might just be misinterpreting things. It's probably best to fight your way all the way through as it gets increasingly adventurous.

Enjoy :-)

Best regards,
Frase
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
#

After spending a little while playing with AMQP 1.0 support in Qpid trying to 
get
my head around AMQP 1.0 Addressing and Messaging patterns using qpid::messaging
(via spout and drain) and Proton Messenger via the send & recv programs it 
seemed
worth jotting down some musings on what I've been seeing as a way of trying to
externalise and understand what's going on in my own mind and to try and figure
out differences between Qpid's AMQP 1.0 support and AMQP 0.10 support.

My understanding and interpretation of the AMQP 1.0 specification might well not
be accurate so it'd be great if people who do know can chip in and correct and
expand on some of the stuff that I've got here.

*******************************************************************************
************************** Simple amq.fanout example **************************

First let's start with a trivial example where we send and receive messages 
through
the amq.fanout node on the C++ Broker using Proton Messenger and 
qpid::messaging.


Using drain to subscribe to the amq.fanout node via qpid::messaging looks like 
this:
./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout"


I can send a message addressed to amq.fanout from Proton Messenger like this:
./send -a amqp://localhost/amq.fanout

I can even send over a WebSocket from an async JavaScript port of send in 
Node.js
(For the C++ Broker I'm cheating by using a WebSocket<->TCP Socket proxy 
listening
on 5673 but with the Java Broker I can do this directly - yay!!)
node send-async.js -a amqp://localhost:5673/amq.fanout

The equivalent of send using qpid::messaging via spout looks like this:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.fanout"


These sends resulted in the following messages from drain:

1. From Proton Messenger send:
Message(properties={x-amqp-absolute-expiry-time:0, x-amqp-creation-time:0, 
x-amqp-group-sequence:0, x-amqp-to:amqp://localhost/amq.fanout}, content='Hello 
World!')

2. From JavaScript Proton send:
Message(properties={x-amqp-absolute-expiry-time:0, x-amqp-creation-time:0, 
x-amqp-group-sequence:0, x-amqp-to:amqp://localhost:5673/amq.fanout}, 
content='Hello World!')

3. From qpid::messaging spout:
Message(properties={spout-id:cf6c9ee4-93e8-42cf-af6b-cfff1632e5e8:0}, 
content='Hello World')



For completeness I used the Messenger recv program in parallel with drain above.
./recv amqp://localhost/amq.fanout

and noted that (as expected) both the drain and recv consumers received 
messages.
I did notice the following results however:

1. From Proton Messenger send:
Address: amqp://localhost/amq.fanout
Subject: (no subject)
Content: "Hello World!"

2. From qpid::messaging spout:
Address: (null)
Subject: (no subject)
Content: "Hello World"

So only the Proton send example sets the Address. There's a line in send.c
  pn_message_set_address(message, address);

so that's not necessarily too surprising, it doesn't seem to be explicitly
set in spout, so again the Address not being present isn't *totally* surprising
until one considers the results I show a little later using AMQP 0.10 spout.



trying:
./spout --connection-options {protocol:amqp1.0} -b localhost -P 
x-amqp-to=amqp://localhost/amq.fanout --content "Hello World" "amq.fanout"

gave
Address: amqp://localhost/amq.fanout
Subject: (no subject)
Content: "Hello World"

So essentially the same result as I got sending with:
./send -a amqp://localhost/amq.fanout



Finally, for a little bit of fun I tried messing around with spout's broker URL
field and seeing if there were differences between 1.0 and 0.10.

./spout -b amqp://localhost --content "Hello World" "amq.fanout"
./spout --connection-options {protocol:amqp1.0} -b amqp://localhost --content 
"Hello World" "amq.fanout"
both barf with "Invalid URL: amqp://localhost", which is kind of what I was 
expecting and

./spout -b amqp:localhost --content "Hello World" "amq.fanout"
./spout --connection-options {protocol:amqp1.0} -b amqp:localhost --content 
"Hello World" "amq.fanout"
both succeed, but there were even some surprising observations with this....

When I looked at the results from recv I saw:

1. From qpid::messaging spout protocol AMQP 0.10:
Address: amq.fanout
Subject: 
Content: b"Hello World"

2. From qpid::messaging spout protocol AMQP 1.0:
Address: (null)
Subject: (no subject)
Content: "Hello World"

So it looks like the AMQP 0.10 qpid::messaging is setting x-amqp-to (albeit to
what amounts to the path part of the complete AMQP address) but the content is
subtly different!! I'm *guessing* that the b prefix stands for "binary" - I
*think* that Proton codec.c is applying the prefix in pni_inspect_atom in the 
code

      case PN_BINARY:
        pfx = "b";
        quote = true;
        break;

But in any case it's interesting that the AMQP 1.0 implementation of 
qpid::messaging
seems to be encoding text as UTF8 by default but the AMQP 0.10 implementation
defaults to binary strings. I think defaulting to UTF8 is good for 
interoperability
with Java et. al. but it is a subtle little difference really.

******************* Observations on the amq.fanout example ********************

So even with one of the most trivial scenarios imaginable of clients 
communicating
via the amq.fanout exchange node there are a fair few observations to be made:

            1. The qpid::messaging "x-amqp-" pseudo-property:

The "x-amqp-" stuff is interesting and I'm not entirely sure where it fits in
I *think* that this is probably related to section 3.2.4 in the AMQP 1.0 spec.
This describes "immutable properties of the message" but the field names aren't
prefixed with "x-amqp-" in the spec. and grepping proton-c there's no "x-amqp-".

I *think* that it might well just be a qpid::messaging thing (JMS too??) the
qpid::messaging Message class has a bunch of accessors for properties like
get/setMessageId() but by the looks of it not to/group-sequence/creation-time 
etc.
so my guess is that the "x-amqp-" bit is a way of mapping these things from
section 3.2.4 to the general getProperties()/setProperty() API in 
qpid::messaging.

One thing that is weird though is that these things aren't set by spout. Apart
from to address they aren't explicitly set by send either, which suggests that
qpid::messaging::Message doesn't use message.c from Proton?? I say that because
in message.c pn_message_encode() calls pn_data_fill() with varargs that contain
all the standard properties whether they've been explicitly set or not, but
the drain output from the spout input doesn't have them set.

I also don't know whether setting the "x-amqp-to" by default in AMQP 0.10
qpid:: messaging but not in AMQP 1.0 is accidental or deliberate. It's not part
of spout, so my guess is that qpid::messaging internals is taking the node part
of the Address string and attempting to populate the address, but AMQP 1.0
requires the "x-amqp-" bit which is not being set on the direct AMQP 1.0 path
but is being set on the AMQP 0.10 to AMQP 1.0 interoperability path hence why
it can be seen in recv when using AMQP 0.10 spout???



            2. What's the nature of the queue we see when consuming from 
amq.fanout?

The most striking observation though is that establishing a receiver that has
amq.fanout as an address results in a temporary queue being established on the
Broker with both qpid::messaging and Proton Messenger used as clients.

OK it's not *entirely* surprising and is consistent, doing the same in AMQP 0.10
./drain -b localhost -f "amq.fanout"

But it got me thinking about what this stuff *means* in the context of AMQP 1.0,
because I think that the *interpretation* of what's happening is subtly 
different
between AMQP 0.10 and AMQP 1.0.

In AMQP 0.10 we had the concepts of Exchanges, Queues and Bindings between the
two as a core part of the protocol and you had producer and consumer clients.
Producer clients could *only* talk to Exchanges and consumers could *only* talk
to Queues (which is why the "default direct" fudge existed where you had a 
direct
Exchange that was bound to a Queue using a subject that mapped to the Queue name
so producers could "pretend" that they were sending messages directly on to a
named Queue (Providing a reasonable Mapping to the JMS Queue pattern).


But with AMQP 1.0 (as I understand it) things are a lot "looser" (in terms of 
what
the core protocol actually specifies/mandates). AMQP 1.0 does not define a 
wire-level
distinction between “clients” and “brokers”, the protocol is symmetric.
http://docs.oasis-open.org/amqp/core/v1.0/amqp-core-complete-v1.0.pdf

from Section 2.1.1
"
An AMQP network consists of nodes connected via links. Nodes are named entities
responsible for the safe storage and/or delivery of messages. Messages can 
originate
from, terminate at, or be relayed by nodes. A link is a unidirectional route
between two nodes.

Examples of AMQP nodes are producers, consumers, and queues. Producers and 
consumers
are the elements within an application that generate and process messages. 
Queues
are entities that store and forward messages.
"

from Section 2.1.2
"
The AMQP transport specification defines a peer-to-peer protocol for 
transferring
messages between nodes in an AMQP network.

In order to transfer messages between nodes (e.g., to move messages from a queue
to a consumer) a link needs to be established between the nodes. A link is a
unidirectional route between two nodes.

Sessions provide the context for communication between sources and targets.
Within a session, the link protocol is used to establish links between sources
and targets and to transfer messages across them.
Links are named, and the state at the termini can live longer than the 
connection
on which they were established.
"

   Client App                                      Broker
+-------------+                                +-------------+
|             |################################|             |
|   +---+     |--------------------------------|    +---+    |
|   | C |O<=================+======================O| Q |    |
|   +---+ \   |-------------|---+--------------|   |+---+    |
|          \  |#############|###|#######+######|   |         |
+-----------\-+             |   |       |      +---|---------+
             \              |   |       |          |
             Target         |  Session  |          |
                            |           |        Source
                           Link     Connection


So what does all that mean for *interpretation", well I *think* that it means 
that
in AMQP 1.0 whilst "traditional" exchanges and queues *may* both be types of 
AMQP 1.0
node it is also the case that queues exist in another guise that is closer to an
implementation artefact of "topic" style exchange nodes (e.g.amq.topic, 
amq.match,
and amq.fanout for Qpid, but other AMQP 1.0 implementations may have different 
"topic"
type nodes with similar behaviour and features).

I think that one interpretation of AMQP 1.0 nodes is that they are "addressable
entities", thus in the amq.fanout example being considered the queue that gets
created upon running.
./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout"

Is not *actually* an AMQP 1.0 node because the node being addressed is 
amq.fanout
and the queue is in fact what might be considered a "subscription queue".

One slightly intriguing question remains as to *what* a "subscription queue" is 
in
AMQP 1.0 terms though. It doesn't seem to be a node, but nor is it I think a 
link
either - A link is a unidirectional route between two nodes and (as I 
understand it)
subscription queues can be "shared" to enable "load balancing"/"scale out" over
multiple physical instances of the same "logical" consumer.

I think that in practice then a subscription queue is, in UML terms, an instance
of an association class linked off the 1..* association between source and link
and is created by topic like nodes (generally) upon link attachment.


The discussion on queues and whether they are nodes or not (or nodes or 
"something
else" depending on context) might seem to be a little bit of a philosophical (or
indeed slightly existential) discussion, but I think that it's actually pretty
important when one starts to look at much more complex use-cases where we want
to figure out how to implement more sophisticated messaging patterns that were
readilly done using the topic/headers/xml exchanges.



            3. Where is the Subscription QMF Management Object?

Another slightly surprising observation from running the trivial
./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout"
./recv amqp://localhost/
Is that no "subscription" QMF Management Object seems to get created (it is 
with)
./drain --connection-options -b localhost -f "amq.fanout"

I don't know whether that's accidental or deliberate (in this case arguably the
queue *IS* the subscription) but not having the Subscription Management Object
means that it's impossible for QMF applications to associate a queue with a
connection (or vice versa) because Subscription has sessionRef and queueRef
and Session has connectionRef - so one needs Subscription do navigate from a
Connection to a Queue.

To satisfy my curiosity I tried to create a dynamic node
./drain --connection-options {protocol:amqp1.0} -b localhost -f "#"

That doesn't have a Subscription QMF Management Object created either, nor does
./recv amqp://localhost/#

As an aside specifying # in qpid::messaging seems to create the dynamic queue as
autodelete by default whereas in Proton Messenger it is not autodelete, this is
inconsistent and it probably ought to behave the same, I suspect that autodelete
is the correct default behaviour for an anonymous dynamically created node.


I note in the management-schema.xml that there are new Incoming and Outgoing 
Link
Management Objects, but there's no documentation around them and I've not looked
hard at what they contain. They have a sessionRef which is to be expected but
I don't know (for the case of outgoing Links) whether the source holds the node
name (in this case amq.fanout) or whether it gets populated with the 
subscription
queue name.



            4. Could the qpid::messaging Connection URL be improved?

I mentioned earlier that the "-b" option of spout allows specifying the
qpid::messaging Connection URL, this currently accepts both a plain 
<host>:<port>
but also what is (I believe) an AMQP 0.10 URL - amqp:<host>:<port> - though this
is rarely used in examples, but the following works for both AMQP 0.10 and AMQP 
1.0

./spout -b amqp:localhost --content "Hello World" "amq.fanout"
./spout --connection-options {protocol:amqp1.0} -b amqp:localhost --content 
"Hello World" "amq.fanout"

This got me thinking about whether it might be neater to expand the URL parsing
to support AMQP 1.0 style syntax. At the moment the following.

./spout -b amqp://localhost --content "Hello World" "amq.fanout"
./spout --connection-options {protocol:amqp1.0} -b amqp://localhost --content 
"Hello World" "amq.fanout"
both barf with "Invalid URL: amqp://localhost", which is kind of what I was 
expecting.

But AMQP 1.0 requires the slightly ugly connection option, however a simple
./spout -b amqp://localhost --content "Hello World" "amq.fanout"

Should be all that's really required because the amqp:// is enough to 
discriminate
between an AMQP 1.0 URL and the AMQP 0.10 URL, moreover this could then follow
similar patterns for username and password (and indeed transport protocol) e.g.
./spout -b amqp://user:password@sctp:localhost:5672 --content "Hello World" 
"amq.fanout"

This would (IMHO) give a cleaner syntax for expressing that we wish to use AMQP 
1.0
and for the username password stuff and it feels a lot more consistent with 
Proton
Messenger (and I think from what I've seen of the Draft AMQP 1.0 Addressing 
Spec.)



            5. How does ReplyTo work in qpid::messaging (and I guess JMS)

The previous musings on the qpid::messaging Connection URL got me wondering 
about
ReplyTo in qpid::messaging and JMS (Connection Oriented approaches).

What I mean is that AMQP 1.0 is essentially a peer to peer protocol and doesn't
have the Broker-centric view of the world that AMQP 0.10 did, so if a Message
source specifies a ReplyTo there is no fundamental reason for the response 
Message
to travel through any circuitous route that the original Message may have
traversed, it could be send directly from the ultimate recipient directly back
to the originator (indeed that might be the preferred approach).

In Proton Messenger this is probably obvious, but in a Connection Oriented API
we have an interesting scenario where a ReplyTo address might well have a
Connection part that is completely different to any Connection that the consumer
client currently has established.

This seems slightly *awkward* as neither the qpid::messaging:Address nor the JMS
Destination have any way to retrieve any Receiver/MessageConsumer etc.

At a guess getReplyTo/getJMSReplyTo when called on a Message will result in some
parsing of the Address string passed and transparently create a "hidden" 
Connection,
Session, Sender/MessageProducer etc. (and possibly cache it)?

Creating the necessary Connection/Session etc. from a ReplyTo has also go me
wondering about the ability to construct Session etc. from a "complete" Address
e.g. amqp://<host>:<port>/path

Clearly at present that is not possible, but I'd guess that something similar is
needed to support arbitrary ReplyTo, so it might be interesting to consider a
complete Address in the Connection URL. I guess that in this when one specifies
a Sender/Receiver/MessageProducer/MessageConsumer supplying a Null Address or
Destination could signify use of the "path" part of the "complete" address
specified in the URL (though what about "extended" address parts like link
specification I guess).

It seems worth some thought around ReplyTo and Connection URL semantics as there
is a decent argument for allowing similar semantics to Messenger, particularly
given the AMQP 1.0 Addressing Spec. and people's general familiarity with
<host>:<port>/path style of say HTTP URLs - and of course handling ReplyTo
effectively is a pretty critical use case in any messaging system. 


*******************************************************************************
******************** More sophisticated amq.fanout example ********************

We've seen that there are a surprising number of observations that can be made
of the behaviour of even a trivial example, so let's expand that example a 
little
to try and see how things hang together.

In the observation on "what's the nature of the queue we see when consuming from
amq.fanout?" it seems that the subscription queue being created is not a node,
but rather it's an artefact of the association between a topic like node and out
bound links. The generated name is all ugly and UUIDy; let's specify the link 
name.

./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout; 
{link: {name: test-link}}"

That's interesting as it creates a subscription queue of the form:
<UUID>_test-link

It seems that the first UUID is actually the Container ID of the consumer, sure 
enough
./drain --connection-options "{protocol:amqp1.0, container-id:fadams}" -b 
localhost -f "amq.fanout; {link: {name: test-link}}"

gives me a queue named "fadams_test-link".
Unfortunately none of this stuff seems to be documented anywhere, I came across
it by accident by grepping around <qpid>/cpp/src/qpid/messaging - not ideal!!


Interestingly, running a Proton Messenger consumer
./recv amqp://localhost/amq.fanout

Creates a subscription queue of the form:
<UUID>_receiver-xxx

This suggests that qpid::messaging by default uses a UUID for the link name 
whereas
Proton Messenger uses "receiver-xxx" as its link name.

I've just noticed https://issues.apache.org/jira/browse/PROTON-491 that has a
headline "proton-c: Messenger uses hard-coded link names of sender-xxx & 
receiver-xxx"


That troubled me a bit, so I grepped the Proton code and found that the key bit
of code seemed to be pn_messenger_link() in messenger.c
That code seems to look for a link whose terminus name matches the name resolved
from the address, or if it can't find one it creates a link using pn_sender() or
pn_receiver() - indeed with hardcoded names. Moreover I can't see any reference 
in
messenger.c *anywhere* that suggest that it is possible to set properties or
capabilities (neither pn_terminus_properties nor pn_terminus_capabilities seems
to be called in messenger.c unless I've missed something)?????


Have I missed something? If Proton Messenger does not allow link name, 
properties
or capabilities to be set then it would seem impossible to do things like apply
filters/selectors, set up shared subscription or indeed use it for any 
non-trivial
messaging scenario, that seems a somewhat crippling limitation if true.

###############################################################################
1. Can Proton Messenger be used for non-trivial messaging patterns?

2. If not, is it on the roadmap to be able to do this because if not it 
massively
   limits its potential usefulness and makes me sad :-(
###############################################################################

Another interesting thing that I notice was that in the Proton Engine API I 
could
only find properties and capabilities related to connection and terminus, whilst
in the AMQP 1.0 Spec. there are capabilities allowed for connection, session, 
link,
source & target. Perhaps it's just that there's no currently useful link or 
session
capabilities so no need for an API yet? Mainly just curious about that.
AMQP 1.0 Specification Section 0.1.3 Non-normative References has hyperlinks 
e.g.
http://www.amqp.org/specification/1.0/link-capabilities
http://www.amqp.org/specification/1.0/link-properties
http://www.amqp.org/specification/1.0/source-capabilities
http://www.amqp.org/specification/1.0/target-capabilities
and a good few more, but these links don't lead to any useful information on
what properties and capabilities actually exist.



Anyway to continue our journey into more sophisticates Address scenarios it 
seems
that we must abandon Proton Messenger for the moment and continue exploring how
qpid::messaging lets us control AMQP 1.0 to enable messaging patterns we'd like.



*******************************************************************************
***************** amq.fanout example with shared subscription *****************

One really useful thing to be able to do is to "scale out" Message receipt so
that we may for example distribute Messages across several physical instances of
a logical consumer, perhaps spanning multiple physical hosts.

This is pretty trivial to do where the node being addressed is a queue I just do
./drain --connection-options {protocol:amqp1.0} -b localhost -f "test"

However many times I like and sure enough when I send to "test" the Messages are
nicely distributed between the consumers, but what about "topic like" nodes and
their "subscription queues"?

If I do:
./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout; 
{link: {name: test-link}}"

Multiple times I get multiple subscription queues and they all receive the same
Messages, which isn't what I want!!

Digging around a bit in <qpid>/cpp/src/qpid/broker/amqp/Session.cpp there's a
method setupOutgoing() with a block "else if (node.exchange) {"
....
bool shared = is_capability_requested(SHARED, pn_terminus_capabilities(source));


That seems to be checking for the string "shared" in the AMQP Array of source 
capabilities.

The syntax below seems to work and create a shared subscription queue:
./drain --connection-options {protocol:amqp1.0} -b localhost -f "amq.fanout; 
{node: {capabilities: [shared]}, link: {name: test-link}}"

Note that this requires the capabilities to be set on the *node* not the link!
Now send messages to amq.fanout and they do indeed get distributed across the 
consumer instances (as I'd hope). One other observation is that this syntax 
creates
a subscription queue with the simple name "test-link", that is to say without 
the
"container-id" bit, this is actually what I'd like, so is a nice surprise.



With the example above though the subscription queue is created "autodelete", so
if my consumers die on the job any messages intended for them fall on the floor.
That's not ideal in many scenarios, so is it possible to prevent this?

This was relatively easy to achieve in AMQP 0.10, but the correct syntax to
achieve the desired behaviour in AMQP 1.0 is unclear, doing:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.fanout; {node: {capabilities: [shared]}, link: {name: fadams-data, 
properties: {auto-delete: false}}}"

fails with "Error: Unrecognised option: properties", so the "traditional"
auto-delete flag behaves differently. It looks like it is not supported at all
on links - I found the following in AddressHelper.cpp
"Link scoped x-declare element not supported over AMQP 1.0.".



So how does one prevent subscription queues from being deleted when connections 
close?
In the AMQP 1.0 Specification section 3.5 there seem to be Source fields that 
may
help: "expiry-policy" and "timeout". 3.5.6 says Terminus Expiry Policy:
link-detach, session-end, connection-close, never. So an Expiry Policy of never
looks like it might be what we need. Unfortunately ..........
I couldn't see anything in the qpid::messaging code related to expiry, I *think*
the mechanism exists in Proton as I found "pn_terminus_set_expiry_policy" but 
that
call doesn't seem to exist in qpid::messaging.

I did however come across a call "pn_terminus_set_timeout(terminus, timeout);"
sure enough setting up consumers to do:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.fanout; {node: {capabilities: [shared]}, link: {name: test-link, timeout: 
60}}"

Will result in the subscription queue expiring 60 seconds after the last 
connection
closes. To be honest that's pretty neat and much clearer than having to set the
"qpid.auto_delete_timeout" in AMQP 0.10, but it'd be nice to have the ability to
set the Expiry Policy too. Is this just a TODO or is there a more fundamental 
reason?


*******************************************************************************
********************* So how do I configure a queue then? *********************

>From what I've discovered so far it seems that the answer to this question 
>depends
on whether the queue is a node or whether it is a "subscription queue".

For the case where the queue is a node it looks like it is possible to use a 
similar approach to that used in AMQP 0.10, for example:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"test; {create: receiver, node: {properties: {auto-delete: false, 
'qpid.max_size': 1000000, 'qpid.policy_type': ring}}}"

Creates a queue called test with the specified properties, the "properties" Map
is used in place of the old "x-declare" (though that still seems to be 
supported)

A temporary auto-delete ring queue of capacity 1000000 may be created as 
follows:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"#; {node: {properties: {'qpid.max_size': 1000000, 'qpid.policy_type': ring}}}"


So far so simple, but with AMQP 0.10 it was possible to create a queue node with
specified parameters and pass configuration to bind that with a given exchange
but with AMQP 1.0 it seems that:
"Node scoped x-bindings element not supported over AMQP 1.0."
"Link scoped x-bindings element not supported over AMQP 1.0."
"Link scoped x-subscribe element not supported over AMQP 1.0."
"Link scoped x-declare element not supported over AMQP 1.0."

In other words we can't have both in one address, we can either have a custom 
queue
or the binding - not both. The only way to have a binding created when 
attaching a
link is to create a receiver link from an exchange, in which case the 
subscription
queue is created automatically and bound based on any supplied filter (I think).


*******************************************************************************
************** So how do I configure a subscription queue then? ***************

As I understand it a fairly neat way around this potential limitation exists in
the form of the concept of "topics". Topics are a sort of pseudo node that holds
the named pairing of an exchange and a set of subscription queue configuration.
When creating a receiver from that node the specified configuration is used, 
which
gets around the lack of a mechanism in the protocol to specify that information
on the attach and also centralising the control over policy.

In Qpid 0.26 onwards qpid-config has been updated to allow adding objects other
than just queues, exchanges and bindings e.g.

qpid-config [OPTIONS] add <type> <name> [--argument 
<property-name>=<property-value>]
qpid-config [OPTIONS] del <type> <name>
qpid-config [OPTIONS] list <type> [--show-property <property-name>]

Though unfortunately the qpid-config help doesn't give any examples for the new 
types.

After playing around for a while I *eventually* figured out that the following
syntax will create a topic called fanout that uses the exchange amq.fanout and
creates circular (ring) subscription queues of size 1000000.

qpid-config add topic fanout --argument exchange=amq.fanout \
--argument qpid.max_size=1000000 --argument qpid.policy_type=ring

qpid-config list topic
gives:

Objects of type 'topic'
  name    durable  properties                                                   
 exchangeRef
  
============================================================================================
  fanout  False    {u'qpid.max_size': u'1000000', u'qpid.policy_type': u'ring'} 
 amq.fanout


Changing our previous shared subscription queue example to use our fanout as
an address instead of amq.fanout.

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"fanout; {node: {capabilities: [shared]}, link: {name: test-link, timeout: 60}}"

This yields a queue called "test-link" with the policy we specified in the 
fanout topic.
If we stand up another identical instance of drain in another window we see the
registered consumer count rise to 2. Now running:

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.fanout"

multiple times and observing our drain instances shows that the messages are 
being
distributed across the consumer instances. Note that we can use *either* the
exchange name amq.fanout or the topic name fanout as the producer address, which
illustrates that the topic is really a "pseudo node" rather than a true node.

In summary, by using the topic pseudo node as a factory we can achieve shared
subscription queues with specified names and configurable properties.



One slightly surprising thing that I noticed though was when I created a topic:

qpid-config add topic fanout --argument exchange=amq.fanout \
--argument qpid.max_size=1000000 --argument qpid.policy_type=ring --argument 
autoDelete=false

This correctly set the max size and policy of the subscription queuem but the
autoDelete of the subscription queue remained set to true. I'm not sure if this
is by accident or design, but it seemed a bit surprising.



By way of variety it is also possible to use qpid-ctrl to create topic objects:

qpid-ctrl create type=topic name=fanout \
properties="{'exchange':'amq.fanout','qpid.max_size':1000000, 
'qpid.policy_type':'ring'}"


*******************************************************************************
************** More advanced subscriptions - getting into a bind **************

To recap, so far we've reached a point where we know how to create queue nodes
on demand in an equivalent way to AMQP 0.10 and moreover we've worked out how
we can use topic like nodes as our address and have them create subscription
queues in a configurable and predictable way, but in AMQP 0.10 our consumers 
could
selectively retrieve messages based on message properties via the bindings that
existed between AMQP 0.10 exchanges and queues. In AMQP 1.0 these concepts don't
exist, so how can I filter my messages?

It turns out that a filter is exactly the answer. AMQP 1.0 has the concept of
filters that may be applied to a message source.


Let's first try publishing to and subscribing from to topic exchange using 
subjects.

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.topic/USA.news; {link: {name: test-link}}"

Doing
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic"

as expected yields nothing at the consumer, however

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/USA.news"

Successfuly delivers the message, again as we'd intuitively expect. Changing the
consumer subscription to include a wildcard:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.topic/*.news; {link: {name: test-link}}"

Causes it to successfully receive messages from both:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/USA.news"
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/UK.news"

but not from:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/UK.weather"


None of this behaviour is really terribly surprising at first glance, but it's
worth considering what's *actually* going on because an AMQP 1.0 there are a few
traps in this for the unwary.

In the "Programming in Apache Qpid" book section 2.4.4 we see the Address 
grammar:

address := name [ SLASH subject ] [ ";" options ]
name := ( part | quoted )+
subject := ( part | quoted | SLASH )*
quoted := STRING / ESC

so the qpid::messaging Address Strings infer a subject if a slash is present. In
most cases this is likely to be reasonable, but in AMQP 1.0 it has the potential
to be slightly confusing for the unwary.

Let's have a little fun with some "interestingly named nodes".

qpid-config add queue test/queue

really *does* add a queue node called test/queue

If we use Proton Messenger we can send a message to test/queue
./send -a amqp://localhost/test/queue

This works fine, though one thing to note from that is that Messenger makes no
inference about the subject from the address, which would have to be explicitly
added via a call to pn_message_set_subject

But qpid::messaging *does* infer the subject, so things get interesting if we 
try
to do the same thing via spout:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "test/queue"

This *does not* send a message to out test/queue queue it *actually* sends a
message to a node called test with a subject set to queue, if we add a queue 
called
test we can see that its message count increments when we try the last spout 
example.

Returning to the qpid::messaging Address grammar we see we can quote the name:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "'test/queue'"

This does indeed work, but it illustrates that there are some quite subtle
differences between Proton Messenger and qpid::messaging addressing.


We've discovered that the sender Address turns out to be not as simple as
it first appears, so let's revisit the subscriber address:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.topic/*.news; {link: {name: test-link}}"


What is this *actually* doing "under the hood"?

If we first try the following (without explicity setting a subject 
subscription):
./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.topic; {link: {name: test-link}}"

we see that
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/USA.news"

and
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic"

both successfully result in messages being delivered. This is what we'd expect
as we are simply sending to and receiving from the amq.topic node with no
subscription filter set.

The *trick* is to set the link's filter property (which is what qpid::messaging 
does
"under the hood" when a subject is set 
<qpid>/cpp/src/qpid/messaging/AddressHelper.cpp)

AddressHelper.cpp makes it easy, for a bit of fun let's try and do it the hard 
way.
The filters are actually specified as AMQP 1.0 extensions in the document.

https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/apache-filters.xml


So to roll our own subject filter (plundering AddressHelper.cpp to reverse
engineer the required syntax) we can do:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.topic; {link: {name: test-link, filter: {name: 'subject-filter', 
descriptor: 'apache.org:legacy-amqp-topic-binding:string', value: '*.news'}}}"

Again causes it to successfully receive messages from both:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/USA.news"
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/UK.news"

but not from:
./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/UK.weather"

If we do:
qpid-config -r queues

Queue 'dbc6504c-9c83-4022-9024-c3fe80253acb_test-link'
    bind [dbc6504c-9c83-4022-9024-c3fe80253acb_test-link] => ''
    bind [*.news] => amq.topic

So we can see that the binding is indeed what we expect it to be.


*******************************************************************************
************* More advanced subscriptions - headers subscriptions *************

The previous section illustrated creating bindings to the topic exchange both
the easy way and the hard way, this section extends that to show how we can 
filter
based on the properties of a message using the headers exchange and selectors.

Let's try to replicate legacy x-bindings that had the following arguments.
arguments: {x-match: all, data-service: amqp-delivery, item-owner: fadams}

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.match; {link: {name: test-link, filter: {name: key, descriptor: 
'apache.org:legacy-amqp-headers-binding:map', value: {x-match: all, 
data-service: amqp-delivery, item-owner: fadams}}}}"

If we do:
qpid-config -r queues

Queue 'a06f7d65-b0be-49bc-a36c-cf92f08aa196_test-link'
    bind [a06f7d65-b0be-49bc-a36c-cf92f08aa196_test-link] => ''
    bind [] => amq.match {u'item-owner': 'fadams', u'data-service': 
'amqp-delivery', u'x-match': 'all'}

We can see that the binding has been added and if we do:

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item-owner=fadams -P data-service=amqp-delivery "amq.match"

the message gets successfully delivered, but if we do

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item-owner=fadams -P data-service=http-delivery "amq.match"

the message does *not* get delivered because the x-match: all in the binding 
means
that we require all the specified binding properties to match.


In general then it looks like this filter can effectively emulate legacy headers
bindings in AMQP 1.0, but wait................

If we look again at the output of qpid-config -r queues we notice that the 
binding
key has not been specified. I would have *expected* it to be set to "key" 
because
that was the name of the filter that we supplied.

###############################################################################

Should the binding key for the legacy headers bindings be set to the filter 
name?
Is this a bug or is it a "feature"??

###############################################################################

There is something of a problem if the headers binding key does not get set as
it means that we cannot have multiple bindings set between amq.match and the
subscription queue. In AMQP 0.10 x-bindings were in the form of a list and the
following was possible allowing basic "OR" logic to be specified e.g.:

x-bindings: [{exchange: 'amq.match', queue: 'testqueue', key: 'key1', 
arguments: {x-match: all, data-service: amqp-delivery, item-owner: fadams}}, 
{exchange: 'amq.match', queue: 'testqueue', key: 'key2', arguments: {x-match: 
all, data-service: http-delivery, item-owner: fadams}}]

in this case the binding would match in the case of
(data-service=amqp-delivery AND item-owner=fadams) OR
(data-service=http-delivery AND item-owner=fadams)

It was often useful to be able to do this sort of thing in AMQP 0.10.
In theory the equivalent of this using the Legacy Amqp Headers Binding should 
be:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.match; {link: {name: test-link, filter: [{name: key1, descriptor: 
'apache.org:legacy-amqp-headers-binding:map', value: {x-match: all, 
data-service: amqp-delivery, item-owner: fadams}}, {name: key2, descriptor: 
'apache.org:legacy-amqp-headers-binding:map', value: {x-match: all, 
data-service: http-delivery, item-owner: fadams}}]}}"

Unfortunately however in this case only the second binding is actually 
registered
because the binding key is empty rather than being populated from the filter 
names.


*******************************************************************************
*************** More advanced subscriptions - XML subscriptions ***************

There doesn't seem to be any "convenience" mechanism for specifying an xquery
based filter, so I think that we have so specify it explicitly using the link
filter method. In addition in order to do xquery based filtering we have to
explicitly create an xml exchange on the broker.

qpid-config add exchange xml xml

For simplicity let's to reproduce the example in Programming in Apache Qpid 
Example
2.11. Using the XML Exchange for AMQP 1.0

This example has the xquery specified as:

let $w := ./weather
return $w/station = 'Raleigh-Durham International Airport (KRDU)'
and $w/temperature_f > 50
and $w/temperature_f - $w/dewpoint > 5
and $w/wind_speed_mph > 7
and $w/wind_speed_mph < 20

and stored in a file rdu.xquery, with a subscription in the form:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"xml; {link: {name: test-link, filter: {name: 'weather', descriptor: 
'apache.org:xquery-filter:string', value: \"$(cat rdu.xquery )\"}}}"

but I couldn't get that form to work, so I ended up doing it like this:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"xml; {link: {name: test-link, filter: {name: 'weather', descriptor: 
'apache.org:xquery-filter:string', value: \" \
./weather/station = 'Raleigh-Durham International Airport (KRDU)' \
and ./weather/temperature_f > 50 \
and ./weather/temperature_f - ./weather/dewpoint > 5 \
and ./weather/wind_speed_mph > 7 \
and ./weather/wind_speed_mph < 20 \
\"}}}"

which did successfully create the subscription, though note that I couldn't get
the let $w := ./weather stuff to work so I explicitly used ./weather. I'm not
especially knowledgeable on xquery, so it'd be interesting to know what I may
have done wrong.

If we use the following sender:

./spout --connection-options {protocol:amqp1.0} -b localhost --content " \
<weather> \
<station>Raleigh-Durham International Airport (KRDU)</station> \
<wind_speed_mph>16</wind_speed_mph> \
<temperature_f>70</temperature_f> \
<dewpoint>35</dewpoint> \
</weather> \
" "xml"

we successfully see the following in drain:

Message(properties={spout-id:41b0c8f7-69f2-472d-81c1-3e7bda331761:0}, content=' 
<weather> <station>Raleigh-Durham International Airport (KRDU)</station> 
<wind_speed_mph>16</wind_speed_mph> <temperature_f>70</temperature_f> 
<dewpoint>35</dewpoint> </weather> ')


It's worth bearing in mind that, like the legacy headers binding the filter 
block
does *not* map the filter name to the xml exchange binding key (I think that it
should!). In the original example the sender address from spout is actually
"xml/weather" but this dousn't work here because the binding key hasn't been
populated, I think that this is something that needs to be looked at for both
the headers and xml exchange legacy binding.


*******************************************************************************
*************** More advanced subscriptions - message selectors ***************

Let's try to create a subscription equivalent to the initial headers binding.
The Selector Filter has a convenience syntax in qpid::messaging simply using the
selector property of link. Note that although we could use amq.match as the
node address it's probably better to simply use amq.fanout to avoid using any
additional matching algorithm.


./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.fanout; {link: {name: test-link, selector: \"data-service='amqp-delivery' 
and item-owner='fadams'\"}}"

If we do:

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item-owner=fadams -P data-service=amqp-delivery "amq.fanout"

we'd hope the message would be successfully delivered, but unfortunately it is 
not!
If however we replace the hyphen in the property names with an underscore as 
below:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.fanout; {link: {name: test-link, selector: \"data_service='amqp-delivery' 
and item_owner='fadams'\"}}"

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=amqp-delivery "amq.fanout"

then the message is delivered as expected and with

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=http-delivery "amq.fanout"

the message (correctly) does not get delivered because the data_service 
property value doesn't match.

###############################################################################

It seems that property names containing a hyphen will not work with selectors?

Digging around a little more on this I discovered something in the JMS Spec for
selectors that says "An identifier is an unlimited-length sequence of letters
and digits, the first of which must be a letter. A letter is any character for
which the method Character.isJavaLetter returns true. This includes '_' and '$'.
A letter or digit is any character for which the method 
Character.isJavaLetterOrDigit
returns true." I think that this is saying that property names must be valid
Java identifiers. TBH I hadn't actually noticed that before :-(

However the AMQP Specification seems a little bit looser, section 3.2.5 says:
The application-properties section is a part of the bare message used for 
structured
application data. Intermediaries can use the data within this structure for the
purposes of filtering or routing. The keys of this map are restricted to be of
type string (which excludes the possibility of a null key) and the values are
restricted to be of simple types only, that is, excluding map, list, and array.

>From a practical perspective not allowing properties that includes a hyphen is
a real pain for my particular set of use cases where I have a number of 
properties
that include hyphens. Being able to migrate my system to AMQP 1.0 would be great
but having to change all of my message producers, of which there are a large
number, to replace the hyphens with underscores in the property names would be
something of an integration nightmare as they certainly cannot all be changed
at once.


It'd be really good to have a work around for this problem, I can understand the
hyphen being an issue as it could get confused with a minus in the selector, but
even things like quoting the property name doesn't currently work.

Interestingly:
https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/apache-filters.xml
Section 2.2 Selector Filter actually says:

The "properties" of the JMS message are equivalent to the AMQP 
application-properties section.
Which, in theory, suggests an unconstrained string - but conflicts with the
JMS Specification in this case it would seem.

Not having a workaround for hyphens is a bit of a show-stopper (or is that
show_stopper) for me at the moment, at best it will make migrating to AMQP 1.0
much harder than I'd like it to be.

###############################################################################


Having got hyphen related angst out of the way, on the plus side message 
selectors
are cool, so let's revisit the slightly more interesting headers binding from 
before.

x-bindings: [{exchange: 'amq.match', queue: 'testqueue', key: 'key1', 
arguments: {x-match: all, data-service: amqp-delivery, item-owner: fadams}}, 
{exchange: 'amq.match', queue: 'testqueue', key: 'key2', arguments: {x-match: 
all, data-service: http-delivery, item-owner: fadams}}]

in this case the binding would match in the case of
(data-service=amqp-delivery AND item-owner=fadams) OR
(data-service=http-delivery AND item-owner=fadams)

After replacing the hyphens with underscores this becomes:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"amq.fanout; {link: {name: test-link, selector: \"(data_service='amqp-delivery' 
and item_owner='fadams') or (data_service='http-delivery' and 
item_owner='fadams')\"}}"

in this case both of the following producer cases successfully deliver messages

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=amqp-delivery "amq.fanout"

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=http-delivery "amq.fanout"

but (for example)

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=jadams -P data_service=http-delivery "amq.fanout"

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=ftp-delivery "amq.fanout"

do not result in messages being delivered (as we would expect).

So (aside from the hyphen issue) it would seem that using message selectors can
easily replace even complex headers bindings with a much simpler and 
significantly
more expressive syntax.


*******************************************************************************
*************************** Bringing it all together **************************

Let's get really adventurous now and create a shared subscription that uses both
topic style subject filtering *and* message selectors, which has a circular
subscription queue with a maximum size of 1000000 bytes.

Firstly we need to create a topic pseudo node to act as a factory for 
subscription
queues created on link attachment to amq.topic

qpid-config add topic topic --argument exchange=amq.topic \
--argument qpid.max_size=1000000 --argument qpid.policy_type=ring

qpid-config list topic
Objects of type 'topic'
  name   durable  properties                                                    
exchangeRef
  
===========================================================================================
  topic  False    {u'qpid.max_size': u'1000000', u'qpid.policy_type': u'ring'}  
amq.topic


Now lets create our subscription, run the following in two separate windows:

./drain --connection-options {protocol:amqp1.0} -b localhost -f \
"topic/*.news; {node: {capabilities: [shared]}, link: {name: data-queue, 
selector: \"(data_service='amqp-delivery' and item_owner='fadams') or 
(data_service='http-delivery' and item_owner='fadams')\"}}"


qpid-config -r queues
Queue 'data-queue'
    bind [data-queue] => ''
    bind [*.news] => amq.topic

So we can see that the topic subject binding is in place binding to amq.topic
We can use qpid-tool (or the GUI) to verify that the configuration for the
queue "data-queue" is as we expect it and that there are two consumers present.

Let's first send a message that should match the subject, but has no additional
properties set.

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" "amq.topic/USA.news"

as expected neither consumer receives the message, so now let's set the 
properties.

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=amqp-delivery "amq.topic/USA.news"

Awesome!!!! If we repeatedly run this command we can see that the messages get
distributed between the two consumers, which is *exactly* what we want to see.


Finally lets try sending with matching message properties, but with a
non-matching subject.

./spout --connection-options {protocol:amqp1.0} -b localhost --content "Hello 
World" \
-P item_owner=fadams -P data_service=amqp-delivery "amq.topic/USA.weather"

as expected the message doesn't get through to the consumers.


So we've been able to prove that Qpid AMQP 1.0 subscriptions can do all that we
could do with AMQP 0.10 subscriptions - and indeed far more. The only real gripe
seems to by the requirement for no hyphens in property names, which is 
unfortunate.


*******************************************************************************
********************************* And finally *********************************

This run through has been done using the Qpid C++ Broker, mainly using 
qpid::messaging
spout and drain. I need to try it again using the Java Broker to see what 
differences
exist (I assume that the "topic" pseudo node concept is specific to qpidd, 
though
such a factory for creating subscription queues is a really useful concept).

I also need to try the scenarios using JMS

It was something of a disappointment that Proton Messenger didn't appear to have
a mechanism in place to be able to specify the link configuration and so was 
thus
limited to simple use cases. Hopefully more sophisticated subscription 
capabilities
will be introduced fairly soon.

I'd also quite like to know how "portable" the subscription approaches are 
across
different AMQP 1.0 providers. At a guess message selector style filters are 
fairly
broadly supported given JMS and also probably subject style filters which are
analogous to JMS "topics", but I don't really know what's out there.






---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to