Author: rajith
Date: Thu Aug 9 18:49:45 2007
New Revision: 564451
URL: http://svn.apache.org/viewvc?view=rev&rev=564451
Log:
Added a Toy Exchange that does same basic routing for direct and topic.
Should be good enough for Arnaud to test atleast the basic JMS functionality.
Added a FileMessage to demo Martins requirment. Haven't tested yet.
The Toy Broker can now accept subscriptions and transfer messages to clients
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/impl/
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Client.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,108 @@
+package org.apache.qpidity.client;
+
+import java.net.URL;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.Channel;
+import org.apache.qpidity.Connection;
+import org.apache.qpidity.ConnectionClose;
+import org.apache.qpidity.ConnectionDelegate;
+import org.apache.qpidity.MinaHandler;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.SessionDelegate;
+
+
+public class Client implements org.apache.qpidity.client.Connection
+{
+ private AtomicInteger _channelNo = new AtomicInteger();
+ private Connection _conn;
+ private ExceptionListener _exceptionListner;
+ private final Lock _lock = new ReentrantLock();
+
+ public static org.apache.qpidity.client.Connection createConnection()
+ {
+ return new Client();
+ }
+
+ public void connect(String host, int port,String virtualHost,String
username, String password) throws QpidException
+ {
+ Condition negotiationComplete = _lock.newCondition();
+ _lock.lock();
+
+ ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ClientSessionDelegate();
+ }
+
+ @Override public void connectionClose(Channel context,
ConnectionClose struct)
+ {
+ _exceptionListner.onException(new QpidException("Server closed
the connection: Reason " + struct.getReplyText(),struct.getReplyCode(),null));
+ }
+ };
+
+ connectionDelegate.setCondition(_lock,negotiationComplete);
+ connectionDelegate.setUsername(username);
+ connectionDelegate.setPassword(password);
+ connectionDelegate.setVirtualHost(virtualHost);
+
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+
+ _conn.getOutputHandler().handle(_conn.getHeader().toByteBuffer());
+
+ try
+ {
+ negotiationComplete.await();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /*
+ * Until the dust settles with the URL disucssion
+ * I am not going to implement this.
+ */
+ public void connect(URL url) throws QpidException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public void close() throws QpidException
+ {
+ Channel ch = _conn.getChannel(0);
+ ch.connectionClose(0, "client is closing", 0, 0);
+ //need to close the connection underneath as well
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ ClientSession ssn = new ClientSession();
+ ssn.attach(ch);
+ ssn.sessionOpen(expiryInSeconds);
+
+ return ssn;
+ }
+
+ public DtxSession createDTXSession(int expiryInSeconds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSession.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpidity.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.api.Message;
+
+/**
+ * Implements a Qpid Sesion.
+ */
+public class ClientSession extends org.apache.qpidity.Session implements
org.apache.qpidity.client.Session
+{
+ private Map<String,MessagePartListener> _messageListeners = new
HashMap<String,MessagePartListener>();
+ private ExceptionListener _exceptionListner;
+ private RangeSet _acquiredMessages;
+ private RangeSet _rejectedMessages;
+ private Map<String,List<RangeSet>> _unackedMessages = new
HashMap<String,List<RangeSet>>();
+
+ @Override public void sessionClose()
+ {
+ // release all unacked messages and then issues a close
+ super.sessionClose();
+ }
+
+ public void messageAcknowledge(RangeSet ranges)
+ {
+ for (Range range : ranges)
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("Acknowleding message for : " +
super.getCommand((int) l));
+ super.processed(l);
+ }
+ }
+ }
+
+ public void messageSubscribe(String queue, String destination, short
confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?>
filter, Option... options)
+ {
+ setMessageListener(destination,listener);
+ super.messageSubscribe(queue, destination, confirmMode, acquireMode,
filter, options);
+ }
+
+ public void messageTransfer(String exchange, Message msg, short
confirmMode, short acquireMode)
+ {
+ // need to break it down into small pieces
+ super.messageTransfer(exchange, confirmMode, acquireMode);
+ super.headers(msg.getDeliveryProperties(),msg.getMessageProperties());
+ // super.data(bytes); *
+ // super.endData()
+ }
+
+
+ public RangeSet getAccquiredMessages()
+ {
+ return _acquiredMessages;
+ }
+
+ public RangeSet getRejectedMessages()
+ {
+ return _rejectedMessages;
+ }
+
+ public void setMessageListener(String destination, MessagePartListener
listener)
+ {
+ _messageListeners.put(destination, listener);
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+ // ugly but nessacery
+
+ void setAccquiredMessages(RangeSet acquiredMessages)
+ {
+ _acquiredMessages = acquiredMessages;
+ }
+
+ void setRejectedMessages(RangeSet rejectedMessages)
+ {
+ _rejectedMessages = rejectedMessages;
+ }
+
+ void notifyException(QpidException ex)
+ {
+ _exceptionListner.onException(ex);
+ }
+
+ Map<String,MessagePartListener> getMessageListerners()
+ {
+ return _messageListeners;
+ }
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,78 @@
+package org.apache.qpidity.client;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.Frame;
+import org.apache.qpidity.MessageAcquired;
+import org.apache.qpidity.MessageReject;
+import org.apache.qpidity.MessageTransfer;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.RangeSet;
+import org.apache.qpidity.Session;
+import org.apache.qpidity.SessionDelegate;
+import org.apache.qpidity.Struct;
+
+
+public class ClientSessionDelegate extends SessionDelegate
+{
+ private MessageTransfer _currentTransfer;
+ private MessagePartListener _currentMessageListener;
+
+ @Override public void data(Session ssn, Frame frame)
+ {
+ for (ByteBuffer b : frame)
+ {
+ _currentMessageListener.addData(b);
+ }
+ if (frame.isLastSegment() && frame.isLastFrame())
+ {
+ _currentMessageListener.messageReceived();
+ }
+
+ }
+
+ @Override public void headers(Session ssn, Struct... headers)
+ {
+ _currentMessageListener.messageHeaders(headers);
+ }
+
+
+ @Override public void messageTransfer(Session session, MessageTransfer
currentTransfer)
+ {
+ _currentTransfer = currentTransfer;
+ _currentMessageListener =
((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+
+ //a better way is to tell the broker to stop the transfer
+ if (_currentMessageListener == null &&
_currentTransfer.getAcquireMode() == 1)
+ {
+ RangeSet transfers = new RangeSet();
+ transfers.add(_currentTransfer.getId());
+ session.messageRelease(transfers);
+ }
+ }
+
+ // --------------------------------------------
+ // Message methods
+ // --------------------------------------------
+
+
+ @Override public void messageReject(Session session, MessageReject struct)
+ {
+ for (Range range : struct.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ session.getCommand((int) l));
+ }
+ }
+ ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+ ((ClientSession)session).notifyException(new QpidException("Message
Rejected",0,null));
+ }
+
+ @Override public void messageAcquired(Session session, MessageAcquired
struct)
+ {
+ ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ }
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Connection.java
Thu Aug 9 18:49:45 2007
@@ -20,6 +20,7 @@
import java.net.URL;
+import java.util.UUID;
import org.apache.qpidity.QpidException;
@@ -46,7 +47,7 @@
* @throws QpidException If the communication layer fails to connect with
the broker.
*/
public void connect(URL url) throws QpidException;
-
+
/**
* Close this connection.
*
@@ -83,5 +84,6 @@
*
* @param exceptionListner The execptionListener
*/
+
public void setExceptionListener(ExceptionListener exceptionListner);
}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/DemoClient.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,84 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg
==================");
+ System.out.println("Message Id : " +
m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg
==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0,
(short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.headers(new DeliveryProperties().setRoutingKey("queue1"),new
MessageProperties().setMessageId("123"));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0,
(short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0,
(short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0,
(short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+ ssn.data("Topic message");
+ ssn.headers(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new
MessageProperties().setMessageId("456"));
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/Session.java
Thu Aug 9 18:49:45 2007
@@ -20,7 +20,6 @@
import java.nio.ByteBuffer;
import java.util.Map;
-import java.util.UUID;
import org.apache.qpidity.Option;
import org.apache.qpidity.RangeSet;
@@ -66,13 +65,6 @@
* <p> When a session is suspend any operation of this session and of the
associated resources are unavailable.
*/
public void sessionSuspend();
-
- /**
- * This will resume an existing session
- * <p> Upon resume the session is attached with an underlying channel
- * hence making operation on this session available.
- */
- public void sessionResume(UUID sessionId);
//------------------------------------------------------
// Messaging methods
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,99 @@
+package org.apache.qpidity.client.util;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk.
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ *
+ * The write methods are not supported.
+ *
+ * From the standpoint of performance it is generally
+ * only worth mapping relatively large files into memory.
+ *
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ *
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage implements Message
+{
+ private MessageProperties _messageProperties;
+ private DeliveryProperties _deliveryProperties;
+ private FileChannel _fileChannel;
+ private int _chunkSize;
+ private long _fileSize;
+ private long _pos = 0;
+
+ public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties
deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _fileChannel = in.getChannel();
+ _chunkSize = chunkSize;
+ _fileSize = _fileChannel.size();
+
+ if (_fileSize <= _chunkSize)
+ {
+ _chunkSize = (int)_fileSize;
+ }
+ }
+
+ public void appendData(byte[] src)
+ {
+ throw new UnsupportedOperationException("This Message is read only
after the initial source");
+ }
+
+ public void appendData(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException("This Message is read only
after the initial source");
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProperties;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ int readLen = target.length <= _chunkSize ? target.length : _chunkSize;
+ if (_pos + readLen > _fileSize)
+ {
+ readLen = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY,
_pos, readLen);
+ _pos += readLen;
+ bb.get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_pos + _chunkSize > _fileSize)
+ {
+ _chunkSize = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY,
_pos, _chunkSize);
+ _pos += _chunkSize;
+ return bb;
+ }
+
+}
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
(added)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,149 @@
+package org.apache.qpidity.client.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.DeliveryProperties;
+import org.apache.qpidity.MessageProperties;
+import org.apache.qpidity.Struct;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.client.MessageListener;
+import org.apache.qpidity.client.MessagePartListener;
+
+/**
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee
+ * when all message data is read.
+ *
+ * This is a good convinience utility for handling
+ * small messages
+ */
+public class MessagePartListenerAdapter implements MessagePartListener
+{
+ MessageListener _adaptee;
+ Message _currentMsg;
+ DeliveryProperties _currentDeliveryProps;
+ MessageProperties _currentMessageProps;
+
+ public MessagePartListenerAdapter(MessageListener listener)
+ {
+ _adaptee = listener;
+
+ // temp solution.
+ _currentMsg = new Message()
+ {
+ Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ ByteBuffer _readBuffer;
+ private int dataSize;
+
+ public void appendData(byte[] src) throws IOException
+ {
+ appendData(ByteBuffer.wrap(src));
+ }
+
+ public void appendData(ByteBuffer src) throws IOException
+ {
+ _data.offer(src);
+ dataSize += src.remaining();
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _currentDeliveryProps;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _currentMessageProps;
+ }
+
+ // since we provide the message only after completion
+ // we can assume that when this method is called we have
+ // received all data.
+ public void readData(byte[] target) throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ _readBuffer.get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+
+ return _readBuffer;
+ }
+
+ private void buildReadBuffer()
+ {
+ //optimize for the simple cases
+ if(_data.size() == 1)
+ {
+ _readBuffer = _data.element().duplicate();
+ }
+ else
+ {
+ _readBuffer = ByteBuffer.allocate(dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
+ }
+ }
+
+ //hack for testing
+ @Override public String toString()
+ {
+ if (_data.size() >0 && _readBuffer == null)
+ {
+ buildReadBuffer();
+ }
+ byte[] b = new byte[_readBuffer.limit()];
+ _readBuffer.get(b);
+ return new String(b);
+ }
+ };
+ }
+
+ public void addData(ByteBuffer src)
+ {
+ try
+ {
+ _currentMsg.appendData(src);
+ }
+ catch(IOException e)
+ {
+ // A chance for IO exception
+ // doesn't occur as we are using
+ // a ByteBuffer
+ }
+ }
+
+ public void messageHeaders(Struct... headers)
+ {
+ for(Struct struct: headers)
+ {
+ if(struct instanceof DeliveryProperties)
+ {
+ _currentDeliveryProps = (DeliveryProperties)struct;
+ }
+ else if (struct instanceof MessageProperties)
+ {
+ _currentMessageProps = (MessageProperties)struct;
+ }
+ }
+ }
+
+ public void messageReceived()
+ {
+ _adaptee.onMessage(_currentMsg);
+ }
+}
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
Thu Aug 9 18:49:45 2007
@@ -18,13 +18,13 @@
package org.apache.qpidity.jms;
import org.apache.qpidity.jms.message.QpidMessage;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
import org.apache.qpidity.RangeSet;
import org.apache.qpidity.QpidException;
import org.apache.qpidity.Option;
import org.apache.qpidity.filter.MessageFilter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.exchange.ExchangeDefaults;
import javax.jms.*;
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
(original)
+++
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/QueueBrowserImpl.java
Thu Aug 9 18:49:45 2007
@@ -26,9 +26,9 @@
import javax.jms.QueueBrowser;
import org.apache.qpidity.client.MessagePartListener;
+import org.apache.qpidity.client.util.MessagePartListenerAdapter;
import org.apache.qpidity.filter.JMSSelectorFilter;
import org.apache.qpidity.filter.MessageFilter;
-import org.apache.qpidity.impl.MessagePartListenerAdapter;
/**
* Implementation of the JMS QueueBrowser interface
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Connection.java
Thu Aug 9 18:49:45 2007
@@ -111,6 +111,8 @@
}
// not sure if this is the right place
+ System.out.println("\n--------------------Broker Start Connection
Negotiation -----------------------\n");
+
getChannel(0).connectionStart(header.getMajor(), header.getMinor(),
null, "PLAIN", "utf8");
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ConnectionDelegate.java
Thu Aug 9 18:49:45 2007
@@ -46,8 +46,8 @@
*/
public abstract class ConnectionDelegate extends Delegate<Channel>
{
- private String _username;
- private String _password;
+ private String _username = "guest";
+ private String _password = "guest";;
private String _mechanism;
private String _virtualHost;
private SaslClient saslClient;
@@ -70,6 +70,7 @@
//-----------------------------------------------
@Override public void connectionStart(Channel context, ConnectionStart
struct)
{
+ System.out.println("\n--------------------Client Start Connection
Negotiation -----------------------\n");
System.out.println("The broker has sent connection-start");
String mechanism = null;
@@ -132,15 +133,19 @@
String knownHosts = struct.getKnownHosts();
System.out.println("The broker has opened the connection for use");
System.out.println("The broker supplied the following hosts for
failover " + knownHosts);
- _negotiationCompleteLock.lock();
- try
- {
- _negotiationComplete.signalAll();
- }
- finally
+ if(_negotiationCompleteLock != null)
{
- _negotiationCompleteLock.unlock();
+ _negotiationCompleteLock.lock();
+ try
+ {
+ _negotiationComplete.signalAll();
+ }
+ finally
+ {
+ _negotiationCompleteLock.unlock();
+ }
}
+ System.out.println("\n-------------------- Client End Connection
Negotiation -----------------------\n");
}
public void connectionRedirect(Channel context, ConnectionRedirect struct)
@@ -240,8 +245,9 @@
@Override public void connectionOpen(Channel context, ConnectionOpen
struct)
{
String hosts = "amqp:1223243232325";
- System.out.println("The client has sent connection-open-ok");
+ System.out.println("The client has sent connection-open");
context.connectionOpenOk(hosts);
+ System.out.println("\n-------------------- Broker End Connection
Negotiation -----------------------\n");
}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
Thu Aug 9 18:49:45 2007
@@ -20,19 +20,16 @@
*/
package org.apache.qpidity;
-import java.io.IOException;
+import static org.apache.qpidity.Functions.str;
+import java.io.IOException;
import java.nio.ByteBuffer;
-
import java.util.ArrayList;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
-import static org.apache.qpidity.Functions.*;
-
/**
* ToyBroker
@@ -43,21 +40,28 @@
class ToyBroker extends SessionDelegate
{
- private Map<String,Queue<Message>> queues;
+ private ToyExchange exchange;
private MessageTransfer xfr = null;
private DeliveryProperties props = null;
private Struct[] headers = null;
private List<Frame> frames = null;
-
- public ToyBroker(Map<String,Queue<Message>> queues)
+ private Map<String,String> consumers = new HashMap<String,String>();
+
+ public ToyBroker(ToyExchange exchange)
{
- this.queues = queues;
+ this.exchange = exchange;
}
@Override public void queueDeclare(Session ssn, QueueDeclare qd)
{
- queues.put(qd.getQueue(), new LinkedList());
- System.out.println("declared queue: " + qd.getQueue());
+ exchange.createQueue(qd.getQueue());
+ System.out.println("\n==================> declared queue: " +
qd.getQueue() + "\n");
+ }
+
+ @Override public void queueBind(Session ssn, QueueBind qb)
+ {
+ exchange.bindQueue(qb.getExchange(), qb.getRoutingKey(),qb.getQueue());
+ System.out.println("\n==================> bound queue: " +
qb.getQueue() + " with routing key " + qb.getRoutingKey() + "\n");
}
@Override public void queueQuery(Session ssn, QueueQuery qq)
@@ -65,6 +69,12 @@
QueueQueryResult result = new QueueQueryResult().queue(qq.getQueue());
ssn.executionResult(qq.getId(), result);
}
+
+ @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
+ {
+ consumers.put(ms.getDestination(),ms.getQueue());
+ System.out.println("\n==================> message subscribe : " +
ms.getDestination() + "\n");
+ }
@Override public void messageTransfer(Session ssn, MessageTransfer xfr)
{
@@ -88,16 +98,7 @@
props = (DeliveryProperties) hdr;
}
}
-
- if (props != null && !props.getDiscardUnroutable())
- {
- String dest = xfr.getDestination();
- if (!queues.containsKey(dest))
- {
- reject(ssn);
- }
- }
-
+
this.headers = headers;
}
@@ -115,16 +116,17 @@
if (frame.isLastSegment() && frame.isLastFrame())
{
String dest = xfr.getDestination();
- Queue queue = queues.get(dest);
- if (queue == null)
+ Message m = new Message(headers, frames);
+
+ if (exchange.route(dest,props.getRoutingKey(),m))
{
- reject(ssn);
+ System.out.println("queued " + m);
+ dispatchMessages(ssn);
}
else
{
- Message m = new Message(headers, frames);
- queue.offer(m);
- System.out.println("queued " + m);
+
+ reject(ssn);
}
ssn.processed(xfr);
xfr = null;
@@ -145,8 +147,35 @@
ssn.messageReject(ranges, 0, "no such destination");
}
}
+
+ private void transferMessage(Session ssn,String dest, Message m)
+ {
+ System.out.println("\n==================> Transfering message to: "
+dest + "\n");
+ ssn.messageTransfer(dest, (short)0, (short)0);
+ ssn.headers(m.headers);
+ for (Frame f : m.frames)
+ {
+ for (ByteBuffer b : f)
+ {
+ ssn.data(b);
+ }
+ }
+ ssn.endData();
+ }
+
+ public void dispatchMessages(Session ssn)
+ {
+ for (String dest: consumers.keySet())
+ {
+ Message m = exchange.getQueue(consumers.get(dest)).poll();
+ if(m != null)
+ {
+ transferMessage(ssn,dest,m);
+ }
+ }
+ }
- private class Message
+ class Message
{
private final Struct[] headers;
private final List<Frame> frames;
@@ -188,14 +217,12 @@
public static final void main(String[] args) throws IOException
{
- final Map<String,Queue<Message>> queues =
- new HashMap<String,Queue<Message>>();
-
+ final ToyExchange exchange = new ToyExchange();
ConnectionDelegate delegate = new ConnectionDelegate()
{
public SessionDelegate getSessionDelegate()
{
- return new ToyBroker(queues);
+ return new ToyBroker(exchange);
}
};
Added:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java?view=auto&rev=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
(added)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
Thu Aug 9 18:49:45 2007
@@ -0,0 +1,132 @@
+package org.apache.qpidity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Queue;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.qpidity.ToyBroker.Message;
+
+public class ToyExchange
+{
+ final static String DIRECT = "amq.direct";
+ final static String TOPIC = "amq.topic";
+
+ private Map<String,List<Queue<Message>>> directEx = new
HashMap<String,List<Queue<Message>>>();
+ private Map<String,List<Queue<Message>>> topicEx = new
HashMap<String,List<Queue<Message>>>();
+ private Map<String,Queue<Message>> queues = new
HashMap<String,Queue<Message>>();
+
+ public void createQueue(String name)
+ {
+ queues.put(name, new LinkedList<Message>());
+ }
+
+ public Queue<Message> getQueue(String name)
+ {
+ return queues.get(name);
+ }
+
+ public void bindQueue(String type,String binding,String queueName)
+ {
+ Queue<Message> queue = queues.get(queueName);
+ binding = normalizeKey(binding);
+ if(DIRECT.equals(type))
+ {
+
+ if (directEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = directEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ directEx.put(binding,list);
+ }
+ }
+ else
+ {
+ if (topicEx.containsKey(binding))
+ {
+ List<Queue<Message>> list = topicEx.get(binding);
+ list.add(queue);
+ }
+ else
+ {
+ List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+ list.add(queue);
+ topicEx.put(binding,list);
+ }
+ }
+ }
+
+ public boolean route(String dest,String routingKey,Message msg)
+ {
+ List<Queue<Message>> queues;
+ if(DIRECT.equals(dest))
+ {
+ queues = directEx.get(routingKey);
+ }
+ else
+ {
+ queues = matchWildCard(routingKey);
+ }
+ if(queues != null && queues.size()>0)
+ {
+ System.out.println("Message stored in " + queues.size() + " queues");
+ storeMessage(msg,queues);
+ return true;
+ }
+ else
+ {
+ System.out.println("Message unroutable " + msg);
+ return false;
+ }
+ }
+
+ private String normalizeKey(String routingKey)
+ {
+ if(routingKey.indexOf(".*")>1)
+ {
+ return routingKey.substring(0,routingKey.indexOf(".*"));
+ }
+ else
+ {
+ return routingKey;
+ }
+ }
+
+ private List<Queue<Message>> matchWildCard(String routingKey)
+ {
+ List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+
+ for(String key: topicEx.keySet())
+ {
+ Pattern p = Pattern.compile(key);
+ Matcher m = p.matcher(routingKey);
+ if (m.find())
+ {
+ for(Queue<Message> queue : topicEx.get(key))
+ {
+ selected.add(queue);
+ }
+ }
+ }
+
+ return selected;
+ }
+
+ private void storeMessage(Message msg,List<Queue<Message>> selected)
+ {
+ for(Queue<Message> queue : selected)
+ {
+ queue.offer(msg);
+ }
+ }
+
+}
Modified:
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?view=diff&rev=564451&r1=564450&r2=564451
==============================================================================
---
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
(original)
+++
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
Thu Aug 9 18:49:45 2007
@@ -1,5 +1,6 @@
package org.apache.qpidity.api;
+import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.qpidity.MessageProperties;
@@ -43,16 +44,16 @@
* </ul>
* @param src
*/
- public void appendData(byte[] src);
+ public void appendData(byte[] src) throws IOException;
- public void appendData(ByteBuffer src);
+ public void appendData(ByteBuffer src) throws IOException;
/**
* This will abstract the underlying message data.
* The Message implementation may not hold all message
* data in memory (especially in the case of large messages)
*
- * The read function might copy data from a
+ * The read function might copy data from
* <ul>
* <li> From memory (Ex: ByteBuffer)
* <li> From Disk
@@ -60,7 +61,8 @@
* </ul>
* @param target
*/
- public void readData(byte[] target);
+ public void readData(byte[] target) throws IOException;
+ public ByteBuffer readData() throws IOException;
}