Author: rajith
Date: Wed Aug 15 16:29:13 2007
New Revision: 566403
URL: http://svn.apache.org/viewvc?view=rev&rev=566403
Log:
Added initial ConnectionFactory support to JMS
Rearranged package structure for qpid client
Addded javadoc support for qpid client
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
Modified:
incubator/qpid/trunk/qpid/java/client/pom.xml
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/pom.xml
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/pom.xml?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/pom.xml (original)
+++ incubator/qpid/trunk/qpid/java/client/pom.xml Wed Aug 15 16:29:13 2007
@@ -188,6 +188,23 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+
<excludePackageNames>org.apache.qpid.*:org.apache.qpidity.jms:org.apache.qpidity.jms.*:org.apache.qpidity.client.impl</excludePackageNames>
+ <groups>
+ <group>
+ <title>API</title>
+ <packages>org.apache.qpidity.client</packages>
+ </group>
+ <group>
+ <title>Utility Package</title>
+ <packages>org.apache.qpidity.client.util</packages>
+ </group>
+ </groups>
+ </configuration>
+ </plugin>
</plugins>
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
Wed Aug 15 16:29:13 2007
@@ -14,6 +14,8 @@
import org.apache.qpidity.MinaHandler;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.client.impl.ClientSession;
+import org.apache.qpidity.client.impl.ClientSessionDelegate;
public class Client implements org.apache.qpidity.client.Connection
@@ -23,6 +25,10 @@
private ExceptionListener _exceptionListner;
private final Lock _lock = new ReentrantLock();
+ /**
+ *
+ * @return returns a new connection to the broker.
+ */
public static org.apache.qpidity.client.Connection createConnection()
{
return new Client();
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
Wed Aug 15 16:29:13 2007
@@ -54,7 +54,7 @@
*
* @param data Data to be added or streamed.
*/
- public void addData(ByteBuffer src);
+ public void data(ByteBuffer src);
/**
* Indicates that the message has been fully received.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
Wed Aug 15 16:29:13 2007
@@ -336,6 +336,9 @@
*/
public void messageReject(RangeSet ranges, int code, String text);
+ /**
+ * @return the rejected message ranges
+ */
public RangeSet getRejectedMessages();
/**
@@ -350,7 +353,9 @@
*/
public void messageAcquire(RangeSet ranges, short mode);
-
+ /**
+ * @return returns the message ranges marked by the broker as acquired.
+ */
public RangeSet getAccquiredMessages();
/**
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java?view=auto&rev=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSession.java
Wed Aug 15 16:29:13 2007
@@ -0,0 +1,125 @@
+package org.apache.qpidity.client.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.Session;
+
+/**
+ * Implements a Qpid Sesion.
+ */
+public class ClientSession extends org.apache.qpidity.Session implements
org.apache.qpidity.client.Session
+{
+ private Map<String,MessagePartListener> _messageListeners = new
HashMap<String,MessagePartListener>();
+ private ExceptionListener _exceptionListner;
+ private RangeSet _acquiredMessages;
+ private RangeSet _rejectedMessages;
+
+ @Override public void sessionClose()
+ {
+ super.sessionClose();
+ }
+
+ public void messageAcknowledge(RangeSet ranges)
+ {
+ for (Range range : ranges)
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("Acknowleding message for : " +
super.getCommand((int) l));
+ super.processed(l);
+ }
+ }
+ }
+
+ public void messageSubscribe(String queue, String destination, short
confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?>
filter, Option... options)
+ {
+ setMessageListener(destination,listener);
+ super.messageSubscribe(queue, destination, confirmMode, acquireMode,
filter, options);
+ }
+
+ public void messageTransfer(String destination, Message msg, short
confirmMode, short acquireMode) throws IOException
+ {
+ // The javadoc clearly says that this method is suitable for small
messages
+ // therefore reading the content in one shot.
+ super.messageTransfer(destination, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.data(msg.readData());
+ super.endData();
+ }
+
+ public void messageStream(String destination, Message msg, short
confirmMode, short acquireMode) throws IOException
+ {
+ super.messageTransfer(destination, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ boolean b = true;
+ int count = 0;
+ while(b)
+ {
+ try
+ {
+ System.out.println("count : " + count++);
+ super.data(msg.readData());
+ }
+ catch(EOFException e)
+ {
+ b = false;
+ }
+ }
+
+ super.endData();
+ }
+
+ public RangeSet getAccquiredMessages()
+ {
+ return _acquiredMessages;
+ }
+
+ public RangeSet getRejectedMessages()
+ {
+ return _rejectedMessages;
+ }
+
+ public void setMessageListener(String destination, MessagePartListener
listener)
+ {
+ _messageListeners.put(destination, listener);
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+ // ugly but nessacery
+
+ void setAccquiredMessages(RangeSet acquiredMessages)
+ {
+ _acquiredMessages = acquiredMessages;
+ }
+
+ void setRejectedMessages(RangeSet rejectedMessages)
+ {
+ _rejectedMessages = rejectedMessages;
+ }
+
+ void notifyException(QpidException ex)
+ {
+ _exceptionListner.onException(ex);
+ }
+
+ Map<String,MessagePartListener> getMessageListerners()
+ {
+ return _messageListeners;
+ }
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java?view=auto&rev=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/ClientSessionDelegate.java
Wed Aug 15 16:29:13 2007
@@ -0,0 +1,86 @@
+package org.apache.qpidity.client.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.ErrorCode;
+import org.apache.qpidity.Frame;
+import org.apache.qpidity.MessageAcquired;
+import org.apache.qpidity.MessageReject;
+import org.apache.qpidity.MessageTransfer;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Session;
+import org.apache.qpidity.SessionClosed;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.client.MessagePartListener;
+
+
+public class ClientSessionDelegate extends SessionDelegate
+{
+ private MessageTransfer _currentTransfer;
+ private MessagePartListener _currentMessageListener;
+
+ @Override public void sessionClosed(Session ssn,SessionClosed
sessionClosed)
+ {
+ ((ClientSession)ssn).notifyException(new
QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null));
+ }
+
+ // --------------------------------------------
+ // Message methods
+ // --------------------------------------------
+ @Override public void data(Session ssn, Frame frame)
+ {
+ for (ByteBuffer b : frame)
+ {
+ _currentMessageListener.data(b);
+ }
+ if (frame.isLastSegment() && frame.isLastFrame())
+ {
+ _currentMessageListener.messageReceived();
+ }
+
+ }
+
+ @Override public void headers(Session ssn, Struct... headers)
+ {
+ _currentMessageListener.messageHeaders(headers);
+ }
+
+
+ @Override public void messageTransfer(Session session, MessageTransfer
currentTransfer)
+ {
+ _currentTransfer = currentTransfer;
+ _currentMessageListener =
((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+ _currentMessageListener.messageTransfer(currentTransfer.getId());
+
+ //a better way is to tell the broker to stop the transfer
+ if (_currentMessageListener == null &&
_currentTransfer.getAcquireMode() == 1)
+ {
+ RangeSet transfers = new RangeSet();
+ transfers.add(_currentTransfer.getId());
+ session.messageRelease(transfers);
+ }
+ }
+
+
+ @Override public void messageReject(Session session, MessageReject struct)
+ {
+ for (Range range : struct.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ session.getCommand((int) l));
+ }
+ }
+ ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+ ((ClientSession)session).notifyException(new QpidException("Message
Rejected",ErrorCode.MESSAGE_REJECTED,null));
+ }
+
+ @Override public void messageAcquired(Session session, MessageAcquired
struct)
+ {
+ ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ }
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java?view=auto&rev=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/DemoClient.java
Wed Aug 15 16:29:13 2007
@@ -0,0 +1,89 @@
+package org.apache.qpidity.client.impl;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.Client;
+import org.apache.qpidity.client.Connection;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg
==================");
+ System.out.println("Message Id : " +
m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg
==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0,
(short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new
MessageProperties().setMessageId("123"));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0,
(short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0,
(short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0,
(short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+ ssn.data("Topic message");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new
MessageProperties().setMessageId("456"));
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java?view=auto&rev=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/impl/LargeMsgDemoClient.java
Wed Aug 15 16:29:13 2007
@@ -0,0 +1,74 @@
+package org.apache.qpidity.client.impl;
+
+import java.io.FileInputStream;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.Client;
+import org.apache.qpidity.client.Connection;
+import org.apache.qpidity.client.ExceptionListener;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.Session;
+import org.apache.qpidity.client.util.FileMessage;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class LargeMsgDemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg
==================");
+ System.out.println("Message Id : " +
m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg
==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0,
(short)0,createAdapter(), null);
+
+ try
+ {
+ FileMessage msg = new FileMessage(new
FileInputStream("/home/rajith/TestFile"),
+ 1024,
+ new
DeliveryProperties().setRoutingKey("queue1"),
+ new
MessageProperties().setMessageId("123"));
+
+ // queue
+ ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
+ ssn.sync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
Wed Aug 15 16:29:13 2007
@@ -32,7 +32,7 @@
_currentMsg = new ByteBufferMessage(transferId);
}
- public void addData(ByteBuffer src)
+ public void data(ByteBuffer src)
{
try
{
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java?view=auto&rev=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionFactoryImpl.java
Wed Aug 15 16:29:13 2007
@@ -0,0 +1,100 @@
+package org.apache.qpidity.jms;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpidity.QpidException;
+
+public class ConnectionFactoryImpl implements
ConnectionFactory,QueueConnectionFactory, TopicConnectionFactory, Referenceable
+{
+ private String _host;
+ private int _port;
+ private String _defaultUsername;
+ private String _defaultPassword;
+ private String _virtualPath;
+ private String _url;
+
+ // Undefined at the moment
+ public ConnectionFactoryImpl(String url)
+ {
+ _url = url;
+ }
+
+ public ConnectionFactoryImpl(String host,int port,String
virtualHost,String defaultUsername,String defaultPassword)
+ {
+ _host = host;
+ _port = port;
+ _defaultUsername = defaultUsername;
+ _defaultPassword = defaultPassword;
+ _virtualPath = virtualHost;
+ }
+
+ public Connection createConnection() throws JMSException
+ {
+ try
+ {
+ return new
ConnectionImpl(_host,_port,_virtualPath,_defaultUsername,_defaultPassword);
+ }
+ catch(QpidException e)
+ {
+ // need to convert the qpid exception into jms exception
+ throw new JMSException("","");
+ }
+ }
+
+ public Connection createConnection(String username, String password)
throws JMSException
+ {
+ try
+ {
+ return new
ConnectionImpl(_host,_port,_virtualPath,username,password);
+ }
+ catch(QpidException e)
+ {
+ // need to convert the qpid exception into jms exception
+ throw new JMSException("","");
+ }
+ }
+
+ // ----------------------------------------
+ // Support for JMS 1.0 classes
+ // ----------------------------------------
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ return (QueueConnection) createConnection();
+ }
+
+ public QueueConnection createQueueConnection(String username, String
password) throws JMSException
+ {
+ return (QueueConnection) createConnection(username, password);
+ }
+
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ return (TopicConnection) createConnection();
+ }
+
+ public TopicConnection createTopicConnection(String username, String
password) throws JMSException
+ {
+ return (TopicConnection) createConnection(username, password);
+ }
+
+
+ // ----------------------------------------
+ // Support for JNDI
+ // ----------------------------------------
+ public Reference getReference() throws NamingException
+ {
+ return new Reference( ConnectionFactoryImpl.class.getName(),
+ new StringRefAddr(ConnectionFactoryImpl.class.getName(),_url));
+ }
+
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/ConnectionImpl.java
Wed Aug 15 16:29:13 2007
@@ -17,19 +17,38 @@
*/
package org.apache.qpidity.jms;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.qpidity.QpidException;
+import java.util.Vector;
-import javax.jms.*;
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
-import java.util.Vector;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.client.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Implements javax.jms.Connection, javax.jms.QueueConnection adn
javax.jms.TopicConnection
*/
-public class ConnectionImpl implements Connection, QueueConnection,
TopicConnection
+public class ConnectionImpl implements Connection, QueueConnection,
TopicConnection, Referenceable
{
/**
* This class's logger
@@ -95,8 +114,10 @@
/**
* TODO define the parameters
*/
- public ConnectionImpl()
+ public ConnectionImpl(String host,int port,String virtualHost,String
username,String password) throws QpidException
{
+ _qpidConnection = Client.createConnection();
+ _qpidConnection.connect(host, port, virtualHost, username, password);
}
//---- Interface javax.jms.Connection ---//
@@ -477,5 +498,11 @@
protected org.apache.qpidity.client.Connection getQpidConnection()
{
return _qpidConnection;
+ }
+
+ public Reference getReference() throws NamingException
+ {
+ return new Reference( ConnectionImpl.class.getName(),
+ new StringRefAddr(ConnectionImpl.class.getName(),""));
}
}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueImpl.java
Wed Aug 15 16:29:13 2007
@@ -43,7 +43,7 @@
{
super(session, name);
_exchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- _exchangeClass = ExchangeDefaults.FANOUT_EXCHANGE_CLASS;
+ _exchangeClass = ExchangeDefaults.DIRECT_EXCHANGE_CLASS;
_queueName = name;
// check that this queue exist on the server
// As pasive is set the server will not create the queue.
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java?view=diff&rev=566403&r1=566402&r2=566403
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/XAConnectionImpl.java
Wed Aug 15 16:29:13 2007
@@ -28,6 +28,11 @@
*/
public class XAConnectionImpl extends ConnectionImpl implements XAConnection
{
+ public XAConnectionImpl(String host, int port, String virtualHost, String
username, String password) throws QpidException
+ {
+ super(host, port, virtualHost, username, password);
+ }
+
/**
* Creates an XASession.
*