Author: rhs
Date: Wed Oct 22 18:21:22 2008
New Revision: 707241

URL: http://svn.apache.org/viewvc?rev=707241&view=rev
Log:
QPID-1339: support for low level session resume

Modified:
    incubator/qpid/trunk/qpid/java/common/Composite.tpl
    incubator/qpid/trunk/qpid/java/common/build.xml
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
    
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
    
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java

Modified: incubator/qpid/trunk/qpid/java/common/Composite.tpl
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/Composite.tpl?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ incubator/qpid/trunk/qpid/java/common/Composite.tpl Wed Oct 22 18:21:22 2008
@@ -13,6 +13,9 @@
 
 import org.apache.qpid.transport.network.Frame;
 
+import org.apache.qpid.util.Strings;
+
+
 ${
 from genutil import *
 
@@ -227,6 +230,26 @@
         setBody(body);
         return this;
     }
+
+    public final byte[] getBodyBytes() {
+        ByteBuffer buf = getBody();
+        byte[] bytes = new byte[buf.remaining()];
+        buf.get(bytes);
+        return bytes;
+    }
+
+    public final void setBody(byte[] body)
+    {
+        setBody(ByteBuffer.wrap(body));
+    }
+
+    public final String getBodyString() {
+        return Strings.fromUTF8(getBodyBytes());
+    }
+
+    public final void setBody(String body) {
+        setBody(Strings.toUTF8(body));
+    }
 """)
 }
 

Modified: incubator/qpid/trunk/qpid/java/common/build.xml
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/build.xml?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/build.xml (original)
+++ incubator/qpid/trunk/qpid/java/common/build.xml Wed Oct 22 18:21:22 2008
@@ -19,9 +19,6 @@
  -
  -->
 <project name="AMQ Common" default="build">
- 
-  <!-- Disabled ConnectionTest due to QPID-1359 -->
-  <property name="module.test.excludes" value="**/ConnectionTest.java"/>
 
   <import file="../module.xml"/>
 

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
 Wed Oct 22 18:21:22 2008
@@ -53,10 +53,10 @@
     implements Receiver<ProtocolEvent>, Sender<ProtocolEvent>
 {
 
-    enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
-
     private static final Logger log = Logger.get(Connection.class);
 
+    enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD }
+
     class DefaultConnectionListener implements ConnectionListener
     {
         public void opened(Connection conn) {}
@@ -202,9 +202,9 @@
         return createSession(0);
     }
 
-    public Session createSession(long timeout)
+    public Session createSession(long expiry)
     {
-        return createSession(UUID.randomUUID().toString(), timeout);
+        return createSession(UUID.randomUUID().toString(), expiry);
     }
 
     public Session createSession(String name)
@@ -212,25 +212,24 @@
         return createSession(name, 0);
     }
 
-    public Session createSession(String name, long timeout)
+    public Session createSession(String name, long expiry)
     {
-        return createSession(Strings.toUTF8(name), timeout);
+        return createSession(Strings.toUTF8(name), expiry);
     }
 
-    public Session createSession(byte[] name, long timeout)
+    public Session createSession(byte[] name, long expiry)
     {
-        return createSession(new Binary(name), timeout);
+        return createSession(new Binary(name), expiry);
     }
 
-    public Session createSession(Binary name, long timeout)
+    public Session createSession(Binary name, long expiry)
     {
         synchronized (lock)
         {
-            Session ssn = new Session(this, name);
+            Session ssn = new Session(this, name, expiry);
             sessions.put(name, ssn);
             map(ssn);
-            ssn.sessionAttach(name.getBytes());
-            ssn.sessionRequestTimeout(timeout);
+            ssn.attach();
             return ssn;
         }
     }
@@ -349,6 +348,19 @@
         }
     }
 
+    public void resume()
+    {
+        synchronized (lock)
+        {
+            for (Session ssn : sessions.values())
+            {
+                map(ssn);
+                ssn.attach();
+                ssn.resume();
+            }
+        }
+    }
+
     public void exception(ConnectionException e)
     {
         synchronized (lock)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Echo.java
 Wed Oct 22 18:21:22 2008
@@ -39,8 +39,9 @@
 
     public void message(Session ssn, MessageTransfer xfr)
     {
+        int id = xfr.getId();
         ssn.invoke(xfr);
-        ssn.processed(xfr);
+        ssn.processed(id);
     }
 
     public void exception(Session ssn, SessionException exc)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
 Wed Oct 22 18:21:22 2008
@@ -140,7 +140,7 @@
 
     public Session getSession(Connection conn, SessionAttach atc)
     {
-        return new Session(conn, new Binary(atc.getName()));
+        return new Session(conn, new Binary(atc.getName()), 0);
     }
 
     @Override public void sessionAttach(Connection conn, SessionAttach atc)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/Session.java
 Wed Oct 22 18:21:22 2008
@@ -24,6 +24,7 @@
 import org.apache.qpid.transport.network.Frame;
 
 import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
 
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
@@ -34,6 +35,7 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.qpid.transport.Option.*;
+import static org.apache.qpid.transport.Session.State.*;
 import static org.apache.qpid.transport.util.Functions.*;
 import static org.apache.qpid.util.Serial.*;
 import static org.apache.qpid.util.Strings.*;
@@ -49,6 +51,8 @@
 
     private static final Logger log = Logger.get(Session.class);
 
+    enum State { NEW, DETACHED, OPEN, CLOSING, CLOSED }
+
     class DefaultSessionListener implements SessionListener
     {
 
@@ -69,49 +73,43 @@
 
     public static final int UNLIMITED_CREDIT = 0xFFFFFFFF;
 
-    private static boolean ENABLE_REPLAY = false;
-
-    static
-    {
-        String enableReplay = "enable_command_replay";
-        try
-        {
-            ENABLE_REPLAY  = new 
Boolean(System.getProperties().getProperty(enableReplay, "false"));
-        }
-        catch (Exception e)
-        {
-            ENABLE_REPLAY = false;
-        }
-    }
-
     private Connection connection;
     private Binary name;
+    private long expiry;
     private int channel;
     private SessionDelegate delegate = new SessionDelegate();
     private SessionListener listener = new DefaultSessionListener();
     private long timeout = 60000;
     private boolean autoSync = false;
 
+    private boolean incomingInit;
     // incoming command count
-    int commandsIn = 0;
+    private int commandsIn;
     // completed incoming commands
     private final Object processedLock = new Object();
-    private RangeSet processed = new RangeSet();
-    private int maxProcessed = commandsIn - 1;
-    private int syncPoint = maxProcessed;
+    private RangeSet processed;
+    private int maxProcessed;
+    private int syncPoint;
 
     // outgoing command count
     private int commandsOut = 0;
-    private Map<Integer,Method> commands = new HashMap<Integer,Method>();
+    private Method[] commands = new Method[64*1024];
     private int maxComplete = commandsOut - 1;
     private boolean needSync = false;
 
-    private AtomicBoolean closed = new AtomicBoolean(false);
+    private State state = NEW;
 
-    Session(Connection connection, Binary name)
+    Session(Connection connection, Binary name, long expiry)
     {
         this.connection = connection;
         this.name = name;
+        this.expiry = expiry;
+        initReceiver();
+    }
+
+    public Connection getConnection()
+    {
+        return connection;
     }
 
     public Binary getName()
@@ -119,6 +117,11 @@
         return name;
     }
 
+    void setExpiry(long expiry)
+    {
+        this.expiry = expiry;
+    }
+
     int getChannel()
     {
         return channel;
@@ -154,9 +157,63 @@
         }
     }
 
-    public Map<Integer,Method> getOutstandingCommands()
+    private void initReceiver()
+    {
+        synchronized (processedLock)
+        {
+            incomingInit = false;
+            processed = new RangeSet();
+        }
+    }
+
+    void attach()
     {
-        return commands;
+        initReceiver();
+        sessionAttach(name.getBytes());
+        sessionRequestTimeout(expiry);
+    }
+
+    void resume()
+    {
+        synchronized (commands)
+        {
+            for (int i = maxComplete + 1; lt(i, commandsOut); i++)
+            {
+                Method m = commands[mod(i, commands.length)];
+                if (m != null)
+                {
+                    sessionCommandPoint(m.getId(), 0);
+                    send(m);
+                }
+            }
+        }
+    }
+
+    void dump()
+    {
+        synchronized (commands)
+        {
+            for (Method m : commands)
+            {
+                if (m != null)
+                {
+                    System.out.println(m);
+                }
+            }
+        }
+    }
+
+    final void commandPoint(int id)
+    {
+        synchronized (processedLock)
+        {
+            this.commandsIn = id;
+            if (!incomingInit)
+            {
+                maxProcessed = commandsIn - 1;
+                syncPoint = maxProcessed;
+            }
+        }
     }
 
     public int getCommandsOut()
@@ -209,11 +266,12 @@
 
     public void processed(Range range)
     {
-        log.debug("%s processed(%s)", this, range);
+        log.debug("%s processed(%s) %s %s", this, range, syncPoint, 
maxProcessed);
 
         boolean flush;
         synchronized (processedLock)
         {
+            log.debug("%s", processed);
             processed.add(range);
             Range first = processed.getFirst();
             int lower = first.getLower();
@@ -281,14 +339,6 @@
         }
     }
 
-    public Method getCommand(int id)
-    {
-        synchronized (commands)
-        {
-            return commands.get(id);
-        }
-    }
-
     boolean complete(int lower, int upper)
     {
         //avoid autoboxing
@@ -301,13 +351,13 @@
             int old = maxComplete;
             for (int id = max(maxComplete, lower); le(id, upper); id++)
             {
-                commands.remove(id);
+                commands[mod(id, commands.length)] = null;
             }
             if (le(lower, maxComplete + 1))
             {
                 maxComplete = max(maxComplete, upper);
             }
-            log.debug("%s   commands remaining: %s", this, commands);
+            log.debug("%s   commands remaining: %s", this, commandsOut - 
maxComplete);
             commands.notifyAll();
             return gt(maxComplete, old);
         }
@@ -329,38 +379,47 @@
         }
     }
 
-    public void invoke(Method m)
+    final private boolean isFull(int id)
     {
-        if (closed.get())
-        {
-            ExecutionException exc = getException();
-            if (exc != null)
-            {
-                throw new SessionException(exc);
-            }
-            else if (close != null)
-            {
-                throw new ConnectionException(close);
-            }
-            else
-            {
-                throw new SessionClosedException();
-            }
-        }
+        return id - maxComplete >= commands.length;
+    }
 
+    public void invoke(Method m)
+    {
         if (m.getEncodedTrack() == Frame.L4)
         {
             synchronized (commands)
             {
+                if (state == CLOSED)
+                {
+                    throw new SessionClosedException();
+                }
+
                 int next = commandsOut++;
                 m.setId(next);
+
+                if (isFull(next))
+                {
+                    Waiter w = new Waiter(commands, timeout);
+                    while (w.hasTime() && isFull(next))
+                    {
+                        sessionFlush(COMPLETED);
+                        w.await();
+                    }
+                }
+
+                if (isFull(next))
+                {
+                    throw new SessionException("timed out waiting for 
completion");
+                }
+
                 if (next == 0)
                 {
                     sessionCommandPoint(0, 0);
                 }
-                if (ENABLE_REPLAY)
+                if (expiry > 0)
                 {
-                    commands.put(next, m);
+                    commands[mod(next, commands.length)] = m;
                 }
                 if (autoSync)
                 {
@@ -404,31 +463,23 @@
                 executionSync(SYNC);
             }
 
-            long start = System.currentTimeMillis();
-            long elapsed = 0;
-            while (!closed.get() && elapsed < timeout && lt(maxComplete, 
point))
-            {
-                try {
-                    log.debug("%s   waiting for[%d]: %d, %s", this, point,
-                              maxComplete, commands);
-                    commands.wait(timeout - elapsed);
-                    elapsed = System.currentTimeMillis() - start;
-                }
-                catch (InterruptedException e)
-                {
-                    throw new RuntimeException(e);
-                }
+            Waiter w = new Waiter(commands, timeout);
+            while (w.hasTime() && state != CLOSED && lt(maxComplete, point))
+            {
+                log.debug("%s   waiting for[%d]: %d, %s", this, point,
+                          maxComplete, commands);
+                w.await();
             }
 
             if (lt(maxComplete, point))
             {
-                if (closed.get())
+                if (state == CLOSED)
                 {
                     throw new SessionException(getException());
                 }
                 else
                 {
-                    throw new RuntimeException
+                    throw new SessionException
                         (String.format
                          ("timed out waiting for sync: complete = %s, point = 
%s", maxComplete, point));
                 }
@@ -518,20 +569,11 @@
         {
             synchronized (this)
             {
-                long start = System.currentTimeMillis();
-                long elapsed = 0;
-                while (!closed.get() && timeout - elapsed > 0 && !isDone())
+                Waiter w = new Waiter(this, timeout);
+                while (w.hasTime() && state != CLOSED && !isDone())
                 {
-                    try
-                    {
-                        log.debug("%s waiting for result: %s", Session.this, 
this);
-                        wait(timeout - elapsed);
-                        elapsed = System.currentTimeMillis() - start;
-                    }
-                    catch (InterruptedException e)
-                    {
-                        throw new RuntimeException(e);
-                    }
+                    log.debug("%s waiting for result: %s", Session.this, this);
+                    w.await();
                 }
             }
 
@@ -539,13 +581,15 @@
             {
                 return result;
             }
-            else if (closed.get())
+            else if (state == CLOSED)
             {
                 throw new SessionException(getException());
             }
             else
             {
-                return null;
+                throw new SessionException
+                    (String.format("%s timed out waiting for result: %s",
+                                   Session.this, this));
             }
         }
 
@@ -588,32 +632,24 @@
 
     public void close()
     {
-        sessionRequestTimeout(0);
-        sessionDetach(name.getBytes());
         synchronized (commands)
         {
-            long start = System.currentTimeMillis();
-            long elapsed = 0;
-            try
+            state = CLOSING;
+            sessionRequestTimeout(0);
+            sessionDetach(name.getBytes());
+            Waiter w = new Waiter(commands, timeout);
+            while (w.hasTime() && state != CLOSED)
             {
-                while (!closed.get() && elapsed < timeout)
-                {
-                    commands.wait(timeout - elapsed);
-                    elapsed = System.currentTimeMillis() - start;
-                }
-
-                if (!closed.get())
-                {
-                    throw new SessionException("close() timed out");
-                }
+                w.await();
             }
-            catch (InterruptedException e)
+
+            if (state != CLOSED)
             {
-                throw new RuntimeException(e);
+                throw new SessionException("close() timed out");
             }
-        }
 
-        connection.removeSession(this);
+            connection.removeSession(this);
+        }
     }
 
     public void exception(Throwable t)
@@ -623,18 +659,27 @@
 
     public void closed()
     {
-        closed.set(true);
         synchronized (commands)
         {
+            if (expiry == 0)
+            {
+                state = CLOSED;
+            }
+            else
+            {
+                state = DETACHED;
+            }
+
             commands.notifyAll();
-        }
-        synchronized (results)
-        {
-            for (ResultFuture<?> result : results.values())
+
+            synchronized (results)
             {
-                synchronized(result)
+                for (ResultFuture<?> result : results.values())
                 {
-                    result.notifyAll();
+                    synchronized(result)
+                    {
+                        result.notifyAll();
+                    }
                 }
             }
         }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/SessionDelegate.java
 Wed Oct 22 18:21:22 2008
@@ -57,7 +57,10 @@
         log.warn("UNHANDLED: [%s] %s", ssn, method);
     }
 
-    @Override public void sessionTimeout(Session ssn, SessionTimeout t) {}
+    @Override public void sessionTimeout(Session ssn, SessionTimeout t)
+    {
+        ssn.setExpiry(t.getTimeout());
+    }
 
     @Override public void sessionCompleted(Session ssn, SessionCompleted cmp)
     {
@@ -113,7 +116,7 @@
 
     @Override public void sessionCommandPoint(Session ssn, SessionCommandPoint 
scp)
     {
-        ssn.commandsIn = scp.getCommandId();
+        ssn.commandPoint(scp.getCommandId());
     }
 
     @Override public void executionSync(Session ssn, ExecutionSync sync)

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/Disassembler.java
 Wed Oct 22 18:21:22 2008
@@ -208,11 +208,14 @@
         if (payload)
         {
             final Header hdr = method.getHeader();
-            final Struct[] structs = hdr.getStructs();
-
-            for (Struct st : structs)
+            if (hdr != null)
             {
-                enc.writeStruct32(st);
+                final Struct[] structs = hdr.getStructs();
+
+                for (Struct st : structs)
+                {
+                    enc.writeStruct32(st);
+                }
             }
             headerSeg = enc.segment();
         }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/network/io/IoSender.java
 Wed Oct 22 18:21:22 2008
@@ -88,12 +88,6 @@
         return result;
     }
 
-    private static final int mod(int n, int m)
-    {
-        int r = n % m;
-        return r < 0 ? m + r : r;
-    }
-
     public void send(ByteBuffer buf)
     {
         if (closed.get())

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/transport/util/Functions.java
 Wed Oct 22 18:21:22 2008
@@ -34,6 +34,12 @@
 public class Functions
 {
 
+    public static final int mod(int n, int m)
+    {
+        int r = n % m;
+        return r < 0 ? m + r : r;
+    }
+
     public static final byte lsb(int i)
     {
         return (byte) (0xFF & i);

Modified: 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/Strings.java
 Wed Oct 22 18:21:22 2008
@@ -79,4 +79,16 @@
         }
     }
 
+    public static final String fromUTF8(byte[] bytes)
+    {
+        try
+        {
+            return new String(bytes, "UTF-8");
+        }
+        catch (UnsupportedEncodingException e)
+        {
+            throw new RuntimeException(e);
+        }
+    }
+
 }

Modified: 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
URL: 
http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java?rev=707241&r1=707240&r2=707241&view=diff
==============================================================================
--- 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
 (original)
+++ 
incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/transport/ConnectionTest.java
 Wed Oct 22 18:21:22 2008
@@ -28,21 +28,26 @@
 import org.apache.qpid.transport.network.io.IoAcceptor;
 import org.apache.qpid.transport.network.io.IoTransport;
 import org.apache.qpid.transport.util.Logger;
+import org.apache.qpid.transport.util.Waiter;
 
 import junit.framework.TestCase;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Random;
 
 /**
  * ConnectionTest
  */
 
-public class ConnectionTest extends TestCase
+public class ConnectionTest extends TestCase implements SessionListener
 {
 
     private static final Logger log = Logger.get(ConnectionTest.class);
 
     private int port;
+    private volatile boolean queue = false;
+    private List<MessageTransfer> messages = new ArrayList<MessageTransfer>();
 
     protected void setUp() throws Exception
     {
@@ -51,10 +56,11 @@
         port = AvailablePortFinder.getNextAvailable(12000);
 
         ConnectionDelegate server = new ServerDelegate() {
-            @Override public void connectionOpen(Connection conn, 
ConnectionOpen open)
+            @Override public Session getSession(Connection conn, SessionAttach 
atc)
             {
-                super.connectionOpen(conn, open);
-                conn.close();
+                Session ssn = super.getSession(conn, atc);
+                ssn.setSessionListener(ConnectionTest.this);
+                return ssn;
             }
         };
 
@@ -63,6 +69,58 @@
         ioa.start();
     }
 
+    public void opened(Session ssn) {}
+
+    public void message(Session ssn, MessageTransfer xfr)
+    {
+        if (queue)
+        {
+            messages.add(xfr);
+            ssn.processed(xfr);
+            return;
+        }
+
+        String body = xfr.getBodyString();
+
+        if (body.startsWith("CLOSE"))
+        {
+            ssn.getConnection().close();
+        }
+        else if (body.startsWith("ECHO"))
+        {
+            int id = xfr.getId();
+            ssn.invoke(xfr);
+            ssn.processed(id);
+        }
+        else if (body.startsWith("SINK"))
+        {
+            ssn.processed(xfr);
+        }
+        else if (body.startsWith("DROP"))
+        {
+            // do nothing
+        }
+        else
+        {
+            throw new IllegalArgumentException
+                ("unrecognized message: " + body);
+        }
+    }
+
+    public void exception(Session ssn, SessionException exc)
+    {
+        throw exc;
+    }
+
+    public void closed(Session ssn) {}
+
+    private void send(Session ssn, String msg)
+    {
+        ssn.messageTransfer
+            ("xxx", MessageAcceptMode.NONE, MessageAcquireMode.PRE_ACQUIRED,
+             null, msg);
+    }
+
     private Connection connect(final Condition closed)
     {
         Connection conn = new Connection();
@@ -89,6 +147,10 @@
     {
         Condition closed = new Condition();
         Connection conn = connect(closed);
+
+        Session ssn = conn.createSession();
+        send(ssn, "CLOSE");
+
         if (!closed.get(3000))
         {
             fail("never got notified of connection close");
@@ -105,4 +167,88 @@
         }
     }
 
+    public void testResume() throws Exception
+    {
+        Connection conn = new Connection();
+        conn.connect("localhost", port, null, "guest", "guest");
+
+        conn.setConnectionListener(new ConnectionListener()
+        {
+            public void opened(Connection conn) {}
+            public void exception(Connection conn, ConnectionException e)
+            {
+                throw e;
+            }
+            public void closed(Connection conn)
+            {
+                queue = true;
+                conn.connect("localhost", port, null, "guest", "guest");
+                conn.resume();
+            }
+        });
+
+        Session ssn = conn.createSession(1);
+        final List<MessageTransfer> incoming = new 
ArrayList<MessageTransfer>();
+        ssn.setSessionListener(new SessionListener()
+        {
+            public void opened(Session s) {}
+            public void exception(Session s, SessionException e) {}
+            public void message(Session s, MessageTransfer xfr)
+            {
+                synchronized (incoming)
+                {
+                    incoming.add(xfr);
+                    incoming.notifyAll();
+                }
+
+                s.processed(xfr);
+            }
+            public void closed(Session s) {}
+        });
+
+        send(ssn, "SINK 0");
+        send(ssn, "ECHO 1");
+        send(ssn, "ECHO 2");
+
+        ssn.sync();
+
+        String[] msgs = { "DROP 3", "DROP 4", "DROP 5", "CLOSE 6", "SINK 7" };
+        for (String m : msgs)
+        {
+            send(ssn, m);
+        }
+
+        ssn.sync();
+
+        assertEquals(msgs.length, messages.size());
+        for (int i = 0; i < msgs.length; i++)
+        {
+            assertEquals(msgs[i], messages.get(i).getBodyString());
+        }
+
+        queue = false;
+
+        send(ssn, "ECHO 8");
+        send(ssn, "ECHO 9");
+
+        synchronized (incoming)
+        {
+            Waiter w = new Waiter(incoming, 30000);
+            while (w.hasTime() && incoming.size() < 4)
+            {
+                w.await();
+            }
+
+            assertEquals(4, incoming.size());
+            assertEquals("ECHO 1", incoming.get(0).getBodyString());
+            assertEquals(0, incoming.get(0).getId());
+            assertEquals("ECHO 2", incoming.get(1).getBodyString());
+            assertEquals(1, incoming.get(1).getId());
+            assertEquals("ECHO 8", incoming.get(2).getBodyString());
+            assertEquals(0, incoming.get(0).getId());
+            assertEquals("ECHO 9", incoming.get(3).getBodyString());
+            assertEquals(1, incoming.get(1).getId());
+        }
+    }
+
 }


Reply via email to