Hi,

this is my first post to this list, please bear with my rather lengthy
mail.

I've just started toying around with qpid and have some questions I
couldn't find
answers to in the documentation:

I'm running qpidd using qpid-cpp-server 0.14 and python-qpid 0.14
(as available as rpm for this RHEL 6.3 server). I'm aware this might be
version-related issues so please tell me right away if I need to build
and install a more up-to-date version :-).
Btw are there more recent rpms known to work on RHEL 6?

Here goes:

I'm sending messages with a simple python sender to such an address:
$ i=0
$ let i=$i+1; ./qpid_send.py "mytopic; {assert:always, create:always, node:
{type: topic, durable: True}, link: {reliability: exactly-once}}" "hello:
$i"

...and receive the messages with a simple python receiver using this
address:
$ ./qpid_listen.py "mytopic; {node: {durable: True}, link: {name:sub1_1,
reliability: exactly-once, durable: True}}"

What I want is reliable publish-subscribe, i.e. no message loss is
tolerable:
- ideally a quality-of-service of exactly-once (but I've spotted in the
docs
that this is not supported (yet?) and falls back to at-least-once)
- messages that originated during receiver downtime must be redelivered

It seems like I have this basically working, but only if my receiving
program
*does not* explicitly close the connection when shut down. This is the
source code
of the Python receiver:

#########################################################
#!/usr/bin/env python2.7


# Import the modules we need
import sys
import getpass
# abuse system-python installed package
sys.path.append('/usr/lib/python2.6/site-packages')
from qpid.messaging import *


def main():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        'address', nargs='+', help='AMQP address(es)')
    parser.add_argument(
        '--broker', default='localhost:5672',
        help='broker connection address (default: %(default)s)')
    parser.add_argument(
        '--session', default=getpass.getuser(),
        help='session id (default: %(default)s)')
    args = parser.parse_args()
    listen(args.broker, args.session, args.address)


def listen(broker, session_name, address):
    connection = Connection(broker)
    try:
        connection.open()

        # Define the session
        session = connection.session(name=session_name)

        # Define listener(s)
        receivers = []
        for addr in address:
            receiver = session.receiver(addr)
            # needed to allow for prefetch which is in turn necessary for
            # multiple sources and the next_receiver() stuff
            receiver.capacity = 10
            receivers.append(receiver)

        startline = ">" * 79
        endline = "<" * 79
        while True:
            receiver = session.next_receiver()
            msg = receiver.fetch()
            # Output the message
            print startline
            print "%s received: %s" % (receiver.source, msg)
            print "    content: '%s'" % msg.content
            print endline
            session.acknowledge(msg)
    except MessagingError, err:
        print 'Messaging error:', err
        raise
    finally:
        # Closing the connection seems to cancel interest in the topic?! I
do
        # not get redelivery of messages missed due to receiver downtime if
        # closing on shutdown...
        #connection.close()
        pass


if __name__ == '__main__':
    main()

#########################################################

Re-delivery of messages that were sent during receiver downtime does only
work for me
when *not* calling connection.close() on receiver shutdown. Is this
intended behaviour?
Could you point me to relevant parts in the docs?

Next question: If a program wants to subscribe to multiple topics I need
separate
receivers for each of the topics(?).

It seems I haven't quite understood the use of names as in "session name"
and "link name" here.

Does the session name have any practical relevance?

Is my assumption correct that I *can not* reuse a link name for a 2nd topic
subscription?
E.g. if I do this...

$ ./qpid_listen.py "mytopic; {node: {durable: True}, link: {name:sub1,
reliability: exactly-once, durable: True}}" "mytopic2; {node: {durable:
True}, link: {name:sub1, reliability: exactly-once, durable: True}}"

(note the same link name "sub1" used for both topics)

...and only the sender for "mytopic; {assert:always, create:always, node:
{type: topic, durable: True}, link: {reliability: exactly-once}}"
is active, I'm seeing the messages retrieved alternately be the 2
receivers:


>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
mytopic; {node: {durable: True}, link: {name:sub1, reliability:
exactly-once, durable: True}} received: Message(durable=True,
content='hallo: 1')
    content: 'hallo: 1'
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
mytopic2; {node: {durable: True}, link: {name:sub1, reliability:
exactly-once, durable: True}} received: Message(durable=True,
content='hallo: 2')
    content: 'hallo: 2'
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<

I guess the link basically constitutes a unique internal input queue for a
receiver
so name reuse will actually have the 2nd receiver dispatch messages from
this queue,
too, although this is "connected" to "mytopic", not "mytopic2".

Hence, I'd need to use distinct link names for the 2 subscriptions.

Is this understanding correct?

Thanks for any hints,
best regards
Holger

(Attaching for reference: The Python sender source code:

#########################################################

#!/usr/bin/env python2.7


# Import the modules we need
import sys
# abuse system-python installed package
sys.path.append('/usr/lib/python2.6/site-packages')
from qpid.messaging import *


def main():
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--broker', default='localhost:5672',
        help='broker connection address (default: %(default)s)')
    parser.add_argument(
        'address', help='AMQP address')
    parser.add_argument(
        'data', help='message text data')
    args = parser.parse_args()
    send(args.broker, args.address, args.data)


def send(broker, address, data):
    connection = Connection(broker)
    try:
        connection.open()

        # Define the session
        session = connection.session()

        # Define a sender
        sender = session.sender(address)

        # Send a simple "Hello world!" message to the queue
        msg = Message(data, durable=True)
        sender.send(msg)

        # Output the message
        print "Sent '%s'" % msg.content

    except MessagingError, err:
        print 'Messaging error:', err
        raise
    finally:
        connection.close()


if __name__ == '__main__':
    main()

#########################################################
)

Landesbank Baden-Wuerttemberg
Anstalt des oeffentlichen Rechts
Hauptsitze: Stuttgart, Karlsruhe, Mannheim, Mainz
HRA 12704
Amtsgericht Stuttgart


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to