This patch contains updates to the python examples.

- Renaming of the declaration programs
- Added a listener for the direct example

Jonathan

[EMAIL PROTECTED] examples]$ svn status
M      direct/direct_producer.py
D      direct/config_direct_exchange.py
A      direct/listener.py
A      direct/declare_queues.py
D      fanout/config_fanout_exchange.py
A      fanout/listener.py
A      fanout/declare_queues.py

Index: direct/direct_producer.py
===================================================================
--- direct/direct_producer.py	(revision 604214)
+++ direct/direct_producer.py	(working copy)
@@ -48,4 +48,3 @@
 # Clean up before exiting so there are no open threads.
 
 session.session_close()
-
Index: direct/config_direct_exchange.py
===================================================================
--- direct/config_direct_exchange.py	(revision 604214)
+++ direct/config_direct_exchange.py	(working copy)
@@ -1,53 +0,0 @@
-#!/usr/bin/env python
-"""
- config_direct_exchange.py
-
- Creates and binds a queue on an AMQP direct exchange.
-
- All messages using the routing key "routing_key" are
- sent to the queue named "message_queue".
-"""
-
-import qpid
-from qpid.client import Client
-from qpid.content import Content
-from qpid.queue import Empty
-
-#----- Initialization -----------------------------------
-
-#  Set parameters for login
-
-host="127.0.0.1"
-port=5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
-user="guest"
-password="guest"
-
-#  Create a client and log in to it.
-
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
-
-session = client.session()
-session.session_open()
-
-#----- Create a queue -------------------------------------
-
-# Create a queue named "listener" on channel 1, and bind it 
-# to the "amq.direct" exchange.
-# 
-# queue_declare() creates an AMQP queue, which is held
-# on the broker. Published messages are sent to the AMQP queue, 
-# from which messages are delivered to consumers. 
-# 
-# queue_bind() determines which messages are routed to a queue. 
-# Route all messages with the routing key "routing_key" to
-# the AMQP queue named "message_queue".
-
-session.queue_declare(queue="message_queue")
-session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key")
-
-#----- Cleanup ---------------------------------------------
-
-session.session_close()
-
Index: direct/listener.py
===================================================================
--- direct/listener.py	(revision 0)
+++ direct/listener.py	(revision 0)
@@ -0,0 +1,87 @@
+#!/usr/bin/env python
+"""
+ listener.py
+
+ This AMQP client reads messages from a message
+ queue named "message_queue". It is implemented
+ as a message listener.
+"""
+
+import qpid
+from qpid.client  import Client
+from qpid.content import Content
+from qpid.queue   import Empty
+from time         import sleep
+
+
+#----- Message Receive Handler -----------------------------
+class Receiver:
+  def __init__ (self):
+    self.finalReceived = False
+
+  def isFinal (self):
+    return self.finalReceived
+    
+  def Handler (self, message):
+    content = message.content.body
+    print content
+    if content == "That's all, folks!":
+      self.finalReceived = True
+
+      #  Messages are not removed from the queue until they are
+      #  acknowledged. Using cumulative=True, all messages from the session
+      #  up to and including the one identified by the delivery tag are
+      #  acknowledged. This is more efficient, because there are fewer
+      #  network round-trips.
+      message.complete(cumulative=True)
+
+
+#----- Initialization --------------------------------------
+
+#  Set parameters for login
+
+host="127.0.0.1"
+port=5672
+amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
+user="guest"
+password="guest"
+
+#  Create a client and log in to it.
+
+client = Client(host, port, qpid.spec.load(amqp_spec))
+client.start({"LOGIN": user, "PASSWORD": password})
+
+session = client.session()
+session.session_open()
+
+#----- Read from queue --------------------------------------------
+
+# Now let's create a local client queue and tell it to read
+# incoming messages.
+
+# The consumer tag identifies the client-side queue.
+
+consumer_tag = "consumer1"
+queue = client.queue(consumer_tag)
+
+# Call message_consume() to tell the broker to deliver messages
+# from the AMQP queue to this local client queue. The broker will
+# start delivering messages as soon as message_consume() is called.
+
+session.message_subscribe(queue="message_queue", destination=consumer_tag)
+session.message_flow(consumer_tag, 0, 0xFFFFFFFF)  # Kill these?
+session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these?
+
+receiver = Receiver ()
+queue.listen (receiver.Handler)
+
+while not receiver.isFinal ():
+  sleep (1)
+
+
+#----- Cleanup ------------------------------------------------
+
+# Clean up before exiting so there are no open threads.
+#
+
+session.session_close()
Index: direct/declare_queues.py
===================================================================
--- direct/declare_queues.py	(revision 0)
+++ direct/declare_queues.py	(revision 0)
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+"""
+ config_direct_exchange.py
+
+ Creates and binds a queue on an AMQP direct exchange.
+
+ All messages using the routing key "routing_key" are
+ sent to the queue named "message_queue".
+"""
+
+import qpid
+from qpid.client import Client
+from qpid.content import Content
+from qpid.queue import Empty
+
+#----- Initialization -----------------------------------
+
+#  Set parameters for login
+
+host="127.0.0.1"
+port=5672
+amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
+user="guest"
+password="guest"
+
+#  Create a client and log in to it.
+
+client = Client(host, port, qpid.spec.load(amqp_spec))
+client.start({"LOGIN": user, "PASSWORD": password})
+
+session = client.session()
+session.session_open()
+
+#----- Create a queue -------------------------------------
+
+# Create a queue named "listener" on channel 1, and bind it 
+# to the "amq.direct" exchange.
+# 
+# queue_declare() creates an AMQP queue, which is held
+# on the broker. Published messages are sent to the AMQP queue, 
+# from which messages are delivered to consumers. 
+# 
+# queue_bind() determines which messages are routed to a queue. 
+# Route all messages with the routing key "routing_key" to
+# the AMQP queue named "message_queue".
+
+session.queue_declare(queue="message_queue")
+session.queue_bind(exchange="amq.direct", queue="message_queue", routing_key="routing_key")
+
+#----- Cleanup ---------------------------------------------
+
+session.session_close()
+
Index: fanout/config_fanout_exchange.py
===================================================================
--- fanout/config_fanout_exchange.py	(revision 604214)
+++ fanout/config_fanout_exchange.py	(working copy)
@@ -1,54 +0,0 @@
-#!/usr/bin/env python
-"""
- config_direct_exchange.py
-
- Creates and binds a queue on an AMQP direct exchange.
-
- All messages using the routing key "routing_key" are
- sent to the queue named "message_queue".
-"""
-
-import qpid
-from qpid.client import Client
-from qpid.content import Content
-from qpid.queue import Empty
-
-#----- Initialization -----------------------------------
-
-#  Set parameters for login
-
-host="127.0.0.1"
-port=5672
-amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
-user="guest"
-password="guest"
-
-#  Create a client and log in to it.
-
-client = Client(host, port, qpid.spec.load(amqp_spec))
-client.start({"LOGIN": user, "PASSWORD": password})
-
-session = client.session()
-session.session_open()
-
-#----- Create a queue -------------------------------------
-
-# Create a queue named "listener" on channel 1, and bind it 
-# to the "amq.fanout" exchange.
-# 
-# queue_declare() creates an AMQP queue, which is held
-# on the broker. Published messages are sent to the AMQP queue, 
-# from which messages are delivered to consumers. 
-# 
-# queue_bind() determines which messages are routed to a queue. 
-# Route all messages with the routing key "routing_key" to
-# the AMQP queue named "message_queue".
-
-session.queue_declare(queue="message_queue")
-session.queue_bind(exchange="amq.fanout", queue="message_queue")
-
-#----- Cleanup ---------------------------------------------
-
-# Clean up before exiting so there are no open threads.
-
-session.session_close()
Index: fanout/listener.py
===================================================================
--- fanout/listener.py	(revision 0)
+++ fanout/listener.py	(revision 0)
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+"""
+ direct_consumer.py
+
+ This AMQP client reads messages from a message
+ queue named "message_queue".
+"""
+
+import qpid
+from qpid.client  import Client
+from qpid.content import Content
+from qpid.queue   import Empty
+from time         import sleep
+
+
+#----- Message Receive Handler -----------------------------
+class Receiver:
+  def __init__ (self):
+    self.finalReceived = False
+
+  def isFinal (self):
+    return self.finalReceived
+    
+  def Handler (self, message):
+    content = message.content.body
+    print content
+    if content == "That's all, folks!":
+      self.finalReceived = True
+
+      #  Messages are not removed from the queue until they are
+      #  acknowledged. Using cumulative=True, all messages from the session
+      #  up to and including the one identified by the delivery tag are
+      #  acknowledged. This is more efficient, because there are fewer
+      #  network round-trips.
+      message.complete(cumulative=True)
+
+
+#----- Initialization --------------------------------------
+
+#  Set parameters for login
+
+host="127.0.0.1"
+port=5672
+amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
+user="guest"
+password="guest"
+
+#  Create a client and log in to it.
+
+client = Client(host, port, qpid.spec.load(amqp_spec))
+client.start({"LOGIN": user, "PASSWORD": password})
+
+session = client.session()
+session.session_open()
+
+#----- Read from queue --------------------------------------------
+
+# Now let's create a local client queue and tell it to read
+# incoming messages.
+
+# The consumer tag identifies the client-side queue.
+
+consumer_tag = "consumer1"
+queue = client.queue(consumer_tag)
+
+# Call message_consume() to tell the broker to deliver messages
+# from the AMQP queue to this local client queue. The broker will
+# start delivering messages as soon as message_consume() is called.
+
+session.message_subscribe(queue="message_queue", destination=consumer_tag)
+session.message_flow(consumer_tag, 0, 0xFFFFFFFF)  # Kill these?
+session.message_flow(consumer_tag, 1, 0xFFFFFFFF) # Kill these?
+
+receiver = Receiver ()
+queue.listen (receiver.Handler)
+
+while not receiver.isFinal ():
+  sleep (1)
+
+
+#----- Cleanup ------------------------------------------------
+
+# Clean up before exiting so there are no open threads.
+#
+
+session.session_close()
Index: fanout/declare_queues.py
===================================================================
--- fanout/declare_queues.py	(revision 0)
+++ fanout/declare_queues.py	(revision 0)
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+"""
+ config_direct_exchange.py
+
+ Creates and binds a queue on an AMQP direct exchange.
+
+ All messages using the routing key "routing_key" are
+ sent to the queue named "message_queue".
+"""
+
+import qpid
+from qpid.client import Client
+from qpid.content import Content
+from qpid.queue import Empty
+
+#----- Initialization -----------------------------------
+
+#  Set parameters for login
+
+host="127.0.0.1"
+port=5672
+amqp_spec="/usr/share/amqp/amqp.0-10-preview.xml"
+user="guest"
+password="guest"
+
+#  Create a client and log in to it.
+
+client = Client(host, port, qpid.spec.load(amqp_spec))
+client.start({"LOGIN": user, "PASSWORD": password})
+
+session = client.session()
+session.session_open()
+
+#----- Create a queue -------------------------------------
+
+# Create a queue named "listener" on channel 1, and bind it 
+# to the "amq.fanout" exchange.
+# 
+# queue_declare() creates an AMQP queue, which is held
+# on the broker. Published messages are sent to the AMQP queue, 
+# from which messages are delivered to consumers. 
+# 
+# queue_bind() determines which messages are routed to a queue. 
+# Route all messages with the routing key "routing_key" to
+# the AMQP queue named "message_queue".
+
+session.queue_declare(queue="message_queue")
+session.queue_bind(exchange="amq.fanout", queue="message_queue")
+
+#----- Cleanup ---------------------------------------------
+
+# Clean up before exiting so there are no open threads.
+
+session.session_close()

Reply via email to