Author: rajith
Date: Fri Jan 11 10:26:00 2008
New Revision: 611251

URL: http://svn.apache.org/viewvc?rev=611251&view=rev
Log:
Added AMQP API examples. The intention is to include these in the M3 release.
Also they should interoperate with python and c++ examples.
Currently I couldn't test complete interoperability due to some c++ examples 
not compiling and also some python examples doesn't seem to run out of the box 
due to dependency issues.

Added:
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
   (with props)
    
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
   (with props)

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,51 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+
+/**
+ *  This creates a queue a queue and binds it to the
+ *  amq.direct exchange
+ *
+ */
+public class DeclareQueue
+{
+
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // declare and bind queue
+        session.queueDeclare("message_queue", null, null);
+        session.queueBind("message_queue", "amq.direct", "routing_key", null);
+
+        // confirm completion
+        session.sync();
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DeclareQueue.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,64 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class DirectProducer
+{
+    /**
+     *  This sends 10 messages to the
+     *  amq.direct exchange using the
+     *  routing key as "routing_key"
+     *
+     */
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+        DeliveryProperties deliveryProps = new DeliveryProperties();
+        deliveryProps.setRoutingKey("routing_key");
+
+        for (int i=0; i<10; i++)
+        {
+            session.messageTransfer("amq.direct", 
Session.TRANSFER_CONFIRM_MODE_REQUIRED, 
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+            session.header(deliveryProps);
+            session.data("Message " + i);
+            session.endData();
+        }
+
+        session.messageTransfer("amq.direct", 
Session.TRANSFER_CONFIRM_MODE_REQUIRED, 
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        session.header(deliveryProps);
+        session.data("That's all, folks!");
+        session.endData();
+
+        // confirm completion
+        session.sync();
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/DirectProducer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,112 @@
+package org.apache.qpid.example.amqpexample.direct;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+
+/**
+ * This listens to messages on a queue and terminates
+ * when it sees the final message
+ *
+ */
+public class Listener implements MessageListener
+{
+    boolean finish = false;
+
+    public void onMessage(Message m)
+    {
+        String data = null;
+
+        try
+        {
+            ByteBuffer buf = m.readData();
+            byte[] b = new byte[buf.remaining()];
+            buf.get(b);
+            data = new String(b);
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error reading message");
+            e.printStackTrace();
+        }
+
+        System.out.println("Message: " + data);
+
+
+        if (data != null && data.equals("That's all, folks!"))
+        {
+            finish = true;
+        }
+    }
+
+    public boolean isFinished()
+    {
+        return finish;
+    }
+
+    /**
+     *  This sends 10 messages to the
+     *  amq.direct exchange using the
+     *  routing key as "routing_key"
+     *
+     */
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // Create an instance of the listener
+        Listener listener = new Listener();
+
+        // create a subscription
+        session.messageSubscribe("message_queue",
+                                 "listener_destination",
+                                 Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                 Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                 new MessagePartListenerAdapter(listener), 
null);
+
+
+        // issue credits
+        session.messageFlow("listener_destination", 
Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+        session.messageFlow("listener_destination", 
Session.MESSAGE_FLOW_UNIT_MESSAGE, 11);
+
+        // confirm completion
+        session.sync();
+
+        // check to see if we have received all the messages
+        while (!listener.isFinished()){}
+        System.out.println("Shutting down listener for listener_destination");
+        session.messageCancel("listener_destination");
+
+        //cleanup
+        session.sessionClose();
+
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/direct/Listener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,51 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+
+/**
+ *  This creates a queue a queue and binds it to the
+ *  amq.direct exchange
+ *
+ */
+public class DeclareQueue
+{
+
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // declare and bind queue
+        session.queueDeclare("message_queue", null, null);
+        session.queueBind("message_queue", "amq.fanout",null, null);
+
+        // confirm completion
+        session.sync();
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/DeclareQueue.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,62 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class FannoutProducer
+{
+    /**
+     *  This sends 10 messages to the
+     *  amq.fannout exchange
+     */
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+        DeliveryProperties deliveryProps = new DeliveryProperties();
+        deliveryProps.setRoutingKey("routing_key");
+
+        for (int i=0; i<10; i++)
+        {
+            session.messageTransfer("amq.fanout", 
Session.TRANSFER_CONFIRM_MODE_REQUIRED, 
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+            session.header(deliveryProps);
+            session.data("Message " + i);
+            session.endData();
+        }
+
+        session.messageTransfer("amq.fanout", 
Session.TRANSFER_CONFIRM_MODE_REQUIRED, 
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        session.header(deliveryProps);
+        session.data("That's all, folks!");
+        session.endData();
+
+        // confirm completion
+        session.sync();
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/FannoutProducer.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,110 @@
+package org.apache.qpid.example.amqpexample.fannout;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+
+/**
+ * This listens to messages on a queue and terminates
+ * when it sees the final message
+ *
+ */
+public class Listener implements MessageListener
+{
+    boolean finish = false;
+
+    public void onMessage(Message m)
+    {
+        String data = null;
+
+        try
+        {
+            ByteBuffer buf = m.readData();
+            byte[] b = new byte[buf.remaining()];
+            buf.get(b);
+            data = new String(b);
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error reading message");
+            e.printStackTrace();
+        }
+
+        System.out.println("Message: " + data);
+
+        if (data != null && data.equals("That's all, folks!"))
+        {
+            finish = true;
+        }
+    }
+
+    public boolean isFinished()
+    {
+        return finish;
+    }
+
+    /**
+     *  This sends 10 messages to the
+     *  amq.direct exchange using the
+     *  routing key as "routing_key"
+     *
+     */
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // Create an instance of the listener
+        Listener listener = new Listener();
+
+        // create a subscription
+        session.messageSubscribe("message_queue",
+                                 "listener_destination",
+                                 Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                 Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                 new MessagePartListenerAdapter(listener), 
null);
+
+
+        // issue credits
+        session.messageFlow("listener_destination", 
Session.MESSAGE_FLOW_UNIT_BYTE, Session.MESSAGE_FLOW_MAX_BYTES);
+        session.messageFlow("listener_destination", 
Session.MESSAGE_FLOW_UNIT_MESSAGE, 11);
+
+        // confirm completion
+        session.sync();
+
+        // check to see if we have received all the messages
+        while (!listener.isFinished()){}
+        System.out.println("Shutting down listener for listener_destination");
+        session.messageCancel("listener_destination");
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/fannout/Listener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,121 @@
+package org.apache.qpid.example.amqpexample.pubsub;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.Option;
+
+
+public class TopicListener implements MessageListener
+{
+    boolean finish = false;
+    int count = 0;
+
+    public void onMessage(Message m)
+    {
+        String data = null;
+
+        try
+        {
+            ByteBuffer buf = m.readData();
+            byte[] b = new byte[buf.remaining()];
+            buf.get(b);
+            data = new String(b);
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error reading message");
+            e.printStackTrace();
+        }
+
+        System.out.println("Message: " + data + " with routing_key " + 
m.getDeliveryProperties().getRoutingKey());
+
+        if (data != null && data.equals("That's all, folks!"))
+        {
+            count++;
+            if (count == 4){
+                finish = true;
+            }
+        }
+    }
+
+    public void prepareQueue(Session session,String queueName,String 
routingKey)
+    {
+        session.queueDeclare(queueName, null, null, Option.EXCLUSIVE, 
Option.AUTO_DELETE);
+        session.queueBind(queueName, "amq.topic", routingKey, null);
+        session.queueBind(queueName, "amq.topic", "control", null);
+
+        session.messageSubscribe(queueName,queueName,
+                                 Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
+                                 Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE,
+                                 new MessagePartListenerAdapter(this),
+                                 null, Option.NO_OPTION);
+        // issue credits
+        session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_BYTE, 
Session.MESSAGE_FLOW_MAX_BYTES);
+        session.messageFlow(queueName, Session.MESSAGE_FLOW_UNIT_MESSAGE, 24);
+    }
+
+    public void cancelSubscription(Session session,String dest)
+    {
+        session.messageCancel(dest);
+    }
+
+    public boolean isFinished()
+    {
+        return finish;
+    }
+
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // Create an instance of the listener
+        TopicListener listener = new TopicListener();
+
+        listener.prepareQueue(session,"usa", "usa.#");
+        listener.prepareQueue(session,"europe", "europe.#");
+        listener.prepareQueue(session,"news", "#.news");
+        listener.prepareQueue(session,"weather", "#.weather");
+
+        // confirm completion
+        session.sync();
+        // check to see if we have received all the messages
+        while (!listener.isFinished()){}
+        System.out.println("Shutting down listener for listener_destination");
+        listener.cancelSubscription(session,"usa");
+        listener.cancelSubscription(session,"europe");
+        listener.cancelSubscription(session,"news");
+        listener.cancelSubscription(session,"weather");
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicListener.java
------------------------------------------------------------------------------
    svn:executable = *

Added: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java?rev=611251&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
 Fri Jan 11 10:26:00 2008
@@ -0,0 +1,75 @@
+package org.apache.qpid.example.amqpexample.pubsub;
+
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.transport.DeliveryProperties;
+
+public class TopicPublisher
+{
+    public void publishMessages(Session session, String routing_key)
+    {
+      // Set the routing key once, we'll use the same routing key for all
+      // messages.
+
+      DeliveryProperties deliveryProps =  new DeliveryProperties();
+      deliveryProps.setRoutingKey(routing_key);
+
+      for (int i=0; i<5; i++) {
+        session.messageTransfer("amq.topic", 
Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        session.header(deliveryProps);
+        session.data("Message " + i);
+        session.endData();
+      }
+
+    }
+
+    public void noMoreMessages(Session session)
+    {
+        session.messageTransfer("amq.topic", 
Session.TRANSFER_CONFIRM_MODE_REQUIRED, 
Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
+        session.header(new DeliveryProperties().setRoutingKey("control"));
+        session.data("That's all, folks!");
+        session.endData();
+    }
+
+    public static void main(String[] args)
+    {
+        // Create connection
+        Connection con = Client.createConnection();
+        try
+        {
+            con.connect("localhost", 5672, "test", "guest", "guest");
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error connecting to broker");
+            e.printStackTrace();
+        }
+
+        // Create session
+        Session session = con.createSession(0);
+
+        // Create an instance of the listener
+        TopicPublisher publisher = new TopicPublisher();
+
+        publisher.publishMessages(session, "usa.news");
+        publisher.publishMessages(session, "usa.weather");
+        publisher.publishMessages(session, "europe.news");
+        publisher.publishMessages(session, "europe.weather");
+
+        // confirm completion
+        session.sync();
+
+        //cleanup
+        session.sessionClose();
+        try
+        {
+            con.close();
+        }
+        catch(Exception e)
+        {
+            System.out.print("Error closing broker connection");
+            e.printStackTrace();
+        }
+    }
+}

Propchange: 
incubator/qpid/trunk/qpid/java/client/example/src/main/java/org/apache/qpid/example/amqpexample/pubsub/TopicPublisher.java
------------------------------------------------------------------------------
    svn:executable = *


Reply via email to