Author: rajith
Date: Fri Aug 24 15:26:42 2007
New Revision: 569547

URL: http://svn.apache.org/viewvc?rev=569547&view=rev
Log:
Added basic test case to test JMS

Added:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
Modified:
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
    
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java

Added: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java?rev=569547&view=auto
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
 (added)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/JMSTestCase.java
 Fri Aug 24 15:26:42 2007
@@ -0,0 +1,34 @@
+package org.apache.qpidity.client;
+
+import org.apache.qpidity.jms.ConnectionFactoryImpl;
+import org.apache.qpidity.jms.TopicImpl;
+
+public class JMSTestCase
+{
+    public static void main(String[] args)
+    {
+        try
+        {
+            javax.jms.Connection con = (new 
ConnectionFactoryImpl("localhost",5672, "test", 
"guest","guest")).createConnection();
+            con.start();
+            
+            javax.jms.Session ssn = con.createSession(false, 1);
+            
+            javax.jms.Destination dest = new TopicImpl("myTopic");
+            javax.jms.MessageProducer prod = ssn.createProducer(dest);
+            javax.jms.MessageConsumer cons = ssn.createConsumer(dest); 
+            
+            javax.jms.BytesMessage msg = ssn.createBytesMessage();
+            msg.writeInt(123);
+            prod.send(msg);
+            
+            javax.jms.Message m = cons.receive();
+            System.out.println(m);
+            
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+}

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
 Fri Aug 24 15:26:42 2007
@@ -87,22 +87,12 @@
     
     public void readData(byte[] target) throws IOException
     {
-        if (_data.size() >0 && _readBuffer == null)
-        {
-            buildReadBuffer();
-        }
-        
-        _readBuffer.get(target);
+        getReadBuffer().get(target);
     }
     
     public ByteBuffer readData() throws IOException
-    {
-        if (_data.size() >0 && _readBuffer == null)
-        {
-            buildReadBuffer();
-        }
-        
-        return _readBuffer;
+    {   
+        return getReadBuffer();
     }
     
     private void buildReadBuffer()
@@ -122,16 +112,39 @@
         }
     }
     
+    private ByteBuffer getReadBuffer() throws IOException
+    {
+        if (_readBuffer != null )
+        {
+           return _readBuffer.slice();
+        }    
+        else
+        {
+            if (_data.size() >0)
+            {
+               buildReadBuffer();
+               return _readBuffer.slice();
+            }
+            else
+            {
+                throw new IOException("No Data to read");
+            }
+        }
+    }
+    
     //hack for testing
     @Override public String toString()
     {
-        if (_data.size() >0 && _readBuffer == null)
+        try
+        {
+            ByteBuffer temp = getReadBuffer();
+            byte[] b = new byte[temp.remaining()];
+            temp.get(b);
+            return new String(b);
+        }
+        catch(IOException e)
         {
-            buildReadBuffer();
+            return "No data";
         }
-        ByteBuffer temp = _readBuffer.duplicate();
-        byte[] b = new byte[temp.remaining()];
-        temp.get(b);
-        return new String(b);
     }
 }

Modified: 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/jms/MessageConsumerImpl.java
 Fri Aug 24 15:26:42 2007
@@ -368,7 +368,9 @@
                 // if this consumer is stopped then this will be call when 
starting
                 requestOneMessage();
                 //When sync() returns we know whether we have received a 
message or not.
+                System.out.println("Internal receive  -- Called sync()");
                 getSession().getQpidSession().sync();
+                System.out.println("Internal receive  -- Returned from 
sync()");
             }
             if (_messageReceived.get() && timeout < 0)
             {
@@ -492,26 +494,32 @@
      * @param message The message delivered to this consumer.
      */
     protected synchronized void onMessage(QpidMessage message)
-    {
+    {        
         try
         {
             // if there is a message selector then we need to evaluate it.
             boolean messageOk = true;
             if (_messageSelector != null)
             {
-                messageOk = _filter.matches((Message) message);
+                messageOk = _filter.matches((Message) message);                
             }
+            
+            System.out.println("Received a message- onMessage in message 
consumer Impl");
             if (!messageOk && _preAcquire)
             {
                 // this is the case for topics
                 // We need to ack this message
+                System.out.println("onMessage - trying to ack message");       
         
                 acknowledgeMessage(message);
+                System.out.println("onMessage - acked message");
             }
             // now we need to acquire this message if needed
             // this is the case of queue with a message selector set
             if (!_preAcquire && messageOk)
             {
+                System.out.println("onMessage - trying to acquire message");
                 messageOk = acquireMessage(message);
+                System.out.println("onMessage - acquired message");
             }
 
             // if this consumer is synchronous then set the current message and
@@ -520,15 +528,17 @@
             {
                 if (_logger.isDebugEnabled())
                 {
-                    _logger.debug("Received a message- onMessage in message 
consumer Impl");
+                    _logger.debug("Received a message- onMessage in message 
consumer Impl");                    
                 }
                 synchronized (_incomingMessageLock)
                 {
+                    System.out.println("got incomming message lock");
                     if (messageOk)
                     {
                         // we have received a proper message that we can 
deliver
                         if (_isReceiving)
                         {
+                            System.out.println("Is receiving true, setting 
message and notifying");
                             _incomingMessage = message;
                             _incomingMessageLock.notify();
                         }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Channel.java
 Fri Aug 24 15:26:42 2007
@@ -140,7 +140,10 @@
             method = m;
         }
 
-        System.out.println("sent " + m);
+        if (m.getEncodedTrack() != Frame.L4)
+        {
+            System.out.println("sent control " + m.getClass().getName());
+        }
     }
 
     public void headers(Struct ... headers)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/CommandDispatcher.java
 Fri Aug 24 15:26:42 2007
@@ -42,7 +42,7 @@
         Session ssn = event.context;
         Method method = event.target;
         method.setId(ssn.nextCommandId());
-        System.out.println("delegating " + method + "[" + method.getId() + "] 
to " + delegate);
+        System.out.println("\n Delegating " + method.getClass().getName() + 
"[" + method.getId() + "] to " + delegate.getClass().getName() + "\n");
         method.delegate(ssn, delegate);
         if (!method.hasPayload())
         {

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDecoder.java
 Fri Aug 24 15:26:42 2007
@@ -50,7 +50,7 @@
 
     public void handle(Event<C,Segment> event)
     {
-        System.out.println("got method segment:\n  " + event.target);
+        //System.out.println("got method segment:\n  " + event.target);
         Iterator<ByteBuffer> fragments = event.target.getFragments();
         Decoder dec = new FragmentDecoder(major, minor, fragments);
         int type = (int) dec.readLong();

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/MethodDispatcher.java
 Fri Aug 24 15:26:42 2007
@@ -40,7 +40,7 @@
     public void handle(Event<C,Method> event)
     {
         Method method = event.target;
-        System.out.println("delegating " + method + " to " + delegate);
+        System.out.println("\nDelegating " + method.getClass().getName() + " 
to " + delegate.getClass().getName() + "\n");
         method.delegate(event.context, delegate);
     }
 

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/Session.java
 Fri Aug 24 15:26:42 2007
@@ -104,12 +104,19 @@
     }
 
     void flushProcessed()
-    {
+    {           
+        for (Range r: processed)
+        {
+            System.out.println("Completed Range [" + r.getLower() + "," + 
r.getUpper() +"]" );     
+        }
+        System.out.println("Notifying peer with execution complete");        
         executionComplete(0, processed);
     }
 
     void syncPoint()
     {
+        System.out.println("===========Request received to 
sync==========================");
+        
         Range range = new Range(0, getCommandsIn() - 1);
         boolean flush;
         synchronized (processed)
@@ -147,9 +154,11 @@
             for (long id = lower; id <= upper; id++)
             {
                 commands.remove(id);
-            }
+            }  
+             
             if (commands.isEmpty())
             {
+                System.out.println("\n All outstanding commands are completed 
!!!! \n");
                 commands.notifyAll();
             }
         }
@@ -167,7 +176,8 @@
         {
             synchronized (commands)
             {
-                commands.put(commandsOut++, m);
+                System.out.println("sent command " + m.getClass().getName() + 
" command Id" + commandsOut);
+                commands.put(commandsOut++, m);                
             }
         }
         channel.method(m);
@@ -200,6 +210,7 @@
 
     public void sync()
     {
+        System.out.println("calling sync()"); 
         synchronized (commands)
         {
             if (!commands.isEmpty())
@@ -210,7 +221,9 @@
             while (!commands.isEmpty())
             {
                 try {
+                    System.out.println("\n============sync() waiting for 
commmands to be completed ==============\n"); 
                     commands.wait();
+                    System.out.println("\n============sync() got 
notified=========================================\n");
                 }
                 catch (InterruptedException e)
                 {

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/SessionDelegate.java
 Fri Aug 24 15:26:42 2007
@@ -46,7 +46,8 @@
         {
             for (Range range : ranges)
             {
-                ssn.complete(range.getLower(), range.getUpper());
+                System.out.println("completed command range: " + 
range.getLower() + " to " + range.getUpper());
+                ssn.complete(range.getLower(), range.getUpper());              
  
             }
         }
         ssn.complete(excmp.getCumulativeExecutionMark());

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
 Fri Aug 24 15:26:42 2007
@@ -28,6 +28,8 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
 
 
 /**
@@ -44,7 +46,7 @@
     private DeliveryProperties props = null;
     private Struct[] headers = null;
     private List<Frame> frames = null;
-    private Map<String,String> consumers = new HashMap<String,String>();
+    private Map<String,Consumer> consumers = new 
ConcurrentHashMap<String,Consumer>();
     
     public ToyBroker(ToyExchange exchange)
     {
@@ -71,14 +73,30 @@
     
     @Override public void messageSubscribe(Session ssn, MessageSubscribe ms)
     {
-        consumers.put(ms.getDestination(),ms.getQueue());
-        System.out.println("\n==================> message subscribe : " + 
ms.getDestination() + "\n");
-    }    
+        Consumer c = new Consumer();
+        c._queueName = ms.getQueue();
+        consumers.put(ms.getDestination(),c);
+        System.out.println("\n==================> message subscribe : " + 
ms.getDestination() + " queue: " + ms.getQueue()  + "\n");              
+    }   
+    
+    @Override public void messageFlow(Session ssn,MessageFlow struct)
+    {
+        Consumer c = consumers.get(struct.getDestination());
+        c._credit = struct.getValue();
+        System.out.println("\n==================> message flow : " + 
struct.getDestination() + " credit: " + struct.getValue()  + "\n");
+    }
+    
+    @Override public void messageFlush(Session ssn,MessageFlush struct)
+    {
+        System.out.println("\n==================> message flush for consumer : 
" + struct.getDestination() + "\n");
+        checkAndSendMessagesToConsumer(ssn,struct.getDestination());
+    }
 
     @Override public void messageTransfer(Session ssn, MessageTransfer xfr)
     {
         this.xfr = xfr;
-        frames = new ArrayList();
+        frames = new ArrayList<Frame>();
+        System.out.println("received transfer " + xfr.getDestination());
     }
 
     public void headers(Session ssn, Struct ... headers)
@@ -95,6 +113,7 @@
             if (hdr instanceof DeliveryProperties)
             {
                 props = (DeliveryProperties) hdr;
+                System.out.println("received headers routing_key " + 
props.getRoutingKey());
             }
         }
         
@@ -147,7 +166,7 @@
         }
     }
     
-    private void transferMessage(Session ssn,String dest, Message m)
+    private void transferMessageToPeer(Session ssn,String dest, Message m)
     {
         System.out.println("\n==================> Transfering message to: " 
+dest + "\n");
         ssn.messageTransfer(dest, (short)0, (short)0);
@@ -162,15 +181,24 @@
         ssn.endData();
     }
     
-    public void dispatchMessages(Session ssn)
+    private void dispatchMessages(Session ssn)
     {
         for (String dest: consumers.keySet())
         {
-            Message m = exchange.getQueue(consumers.get(dest)).poll();
-            if(m != null)
-            {
-                transferMessage(ssn,dest,m);
-            }
+            checkAndSendMessagesToConsumer(ssn,dest);
+        }
+    }
+    
+    private void checkAndSendMessagesToConsumer(Session ssn,String dest)    
+    {
+        Consumer c = consumers.get(dest);
+        LinkedBlockingQueue<Message> queue = exchange.getQueue(c._queueName);
+        Message m = queue.poll();
+        while (m != null && c._credit>0)
+        {
+            transferMessageToPeer(ssn,dest,m);
+            c._credit--;
+            m = queue.poll();
         }
     }
 
@@ -212,6 +240,15 @@
             return sb.toString();
         }
 
+    }
+    
+    // ugly, but who cares :)
+    // assumes unit is always no of messages, not bytes
+    // assumes it's credit mode and not window
+    private class Consumer
+    {
+        long _credit;
+        String _queueName;
     }
 
     public static final void main(String[] args) throws IOException

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java?rev=569547&r1=569546&r2=569547&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyExchange.java
 Fri Aug 24 15:26:42 2007
@@ -5,7 +5,7 @@
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Queue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -16,35 +16,35 @@
   final static String DIRECT = "amq.direct";
   final static String TOPIC = "amq.topic";
   
-  private Map<String,List<Queue<Message>>> directEx = new 
HashMap<String,List<Queue<Message>>>();
-  private Map<String,List<Queue<Message>>> topicEx = new 
HashMap<String,List<Queue<Message>>>(); 
-  private Map<String,Queue<Message>> queues = new 
HashMap<String,Queue<Message>>(); 
+  private Map<String,List<LinkedBlockingQueue<Message>>> directEx = new 
HashMap<String,List<LinkedBlockingQueue<Message>>>();
+  private Map<String,List<LinkedBlockingQueue<Message>>> topicEx = new 
HashMap<String,List<LinkedBlockingQueue<Message>>>(); 
+  private Map<String,LinkedBlockingQueue<Message>> queues = new 
HashMap<String,LinkedBlockingQueue<Message>>(); 
   
   public void createQueue(String name)
   {
-      queues.put(name, new LinkedList<Message>());
+      queues.put(name, new LinkedBlockingQueue<Message>());
   }
   
-  public Queue<Message> getQueue(String name)
+  public LinkedBlockingQueue<Message> getQueue(String name)
   {
       return queues.get(name);
   }
   
   public void bindQueue(String type,String binding,String queueName)
   {
-      Queue<Message> queue = queues.get(queueName);
+      LinkedBlockingQueue<Message> queue = queues.get(queueName);
       binding = normalizeKey(binding);
       if(DIRECT.equals(type))
       {
           
           if (directEx.containsKey(binding))
           {
-              List<Queue<Message>> list = directEx.get(binding);
+              List<LinkedBlockingQueue<Message>> list = directEx.get(binding);
               list.add(queue);
           }
           else
           {
-              List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+              List<LinkedBlockingQueue<Message>> list = new 
LinkedList<LinkedBlockingQueue<Message>>();
               list.add(queue);
               directEx.put(binding,list);
           }
@@ -53,12 +53,12 @@
       {
           if (topicEx.containsKey(binding))
           {
-              List<Queue<Message>> list = topicEx.get(binding);
+              List<LinkedBlockingQueue<Message>> list = topicEx.get(binding);
               list.add(queue);
           }
           else
           {
-              List<Queue<Message>> list = new LinkedList<Queue<Message>>();
+              List<LinkedBlockingQueue<Message>> list = new 
LinkedList<LinkedBlockingQueue<Message>>();
               list.add(queue);
               topicEx.put(binding,list);
           }
@@ -67,7 +67,7 @@
   
   public boolean route(String dest,String routingKey,Message msg)
   {
-      List<Queue<Message>> queues;
+      List<LinkedBlockingQueue<Message>> queues;
       if(DIRECT.equals(dest))
       {
           queues = directEx.get(routingKey);          
@@ -101,9 +101,9 @@
       }
   }
   
-  private List<Queue<Message>> matchWildCard(String routingKey)
+  private List<LinkedBlockingQueue<Message>> matchWildCard(String routingKey)
   {        
-      List<Queue<Message>> selected = new ArrayList<Queue<Message>>();
+      List<LinkedBlockingQueue<Message>> selected = new 
ArrayList<LinkedBlockingQueue<Message>>();
       
       for(String key: topicEx.keySet())
       {
@@ -111,7 +111,7 @@
           Matcher m = p.matcher(routingKey);
           if (m.find())
           {
-              for(Queue<Message> queue : topicEx.get(key))
+              for(LinkedBlockingQueue<Message> queue : topicEx.get(key))
               {
                     selected.add(queue);
               }
@@ -121,9 +121,9 @@
       return selected;      
   }
 
-  private void storeMessage(Message msg,List<Queue<Message>> selected)
+  private void storeMessage(Message msg,List<LinkedBlockingQueue<Message>> 
selected)
   {
-      for(Queue<Message> queue : selected)
+      for(LinkedBlockingQueue<Message> queue : selected)
       {
           queue.offer(msg);
       }


Reply via email to