Author: fhanik
Date: Thu May 17 06:45:15 2007
New Revision: 538920

URL: http://svn.apache.org/viewvc?view=rev&rev=538920
Log:
Add locks instead of synchronized statements to avoid issues between receiving 
and sending

Modified:
    
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
    
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java

Modified: 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=538920&r1=538919&r2=538920
==============================================================================
--- 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java
 Thu May 17 06:45:15 2007
@@ -25,6 +25,7 @@
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.io.XByteBuffer;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 
 
@@ -59,36 +60,49 @@
     private long expire = 3000;
     private boolean forwardExpired = true;
     private int maxQueue = Integer.MAX_VALUE;
+    
+    ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true);
+    ReentrantReadWriteLock outLock= new ReentrantReadWriteLock(true);
 
-    public synchronized void sendMessage(Member[] destination, ChannelMessage 
msg, InterceptorPayload payload) throws ChannelException {
+    public void sendMessage(Member[] destination, ChannelMessage msg, 
InterceptorPayload payload) throws ChannelException {
         if ( !okToProcess(msg.getOptions()) ) {
             super.sendMessage(destination, msg, payload);
             return;
         }
-        for ( int i=0; i<destination.length; i++ ) {
-            int nr = incCounter(destination[i]);
-            //reduce byte copy
-            msg.getMessage().append(nr);
-            try {
-                getNext().sendMessage(new Member[] {destination[i]}, msg, 
payload);
-            }finally {
-                msg.getMessage().trim(4);
+        try {
+            outLock.writeLock().lock();
+            for ( int i=0; i<destination.length; i++ ) {
+                int nr = incCounter(destination[i]);
+                //reduce byte copy
+                msg.getMessage().append(nr);
+                try {
+                    getNext().sendMessage(new Member[] {destination[i]}, msg, 
payload);
+                }finally {
+                    msg.getMessage().trim(4);
+                }
             }
+        }finally {
+            outLock.writeLock().unlock();
         }
     }
 
-    public synchronized void messageReceived(ChannelMessage msg) {
+    public void messageReceived(ChannelMessage msg) {
         if ( !okToProcess(msg.getOptions()) ) {
             super.messageReceived(msg);
             return;
         }
-        int msgnr = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
-        msg.getMessage().trim(4);
-        MessageOrder order = new 
MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
-        if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false);
-    }
+        try {
+            inLock.writeLock().lock();
+            int msgnr = 
XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4);
+            msg.getMessage().trim(4);
+            MessageOrder order = new 
MessageOrder(msgnr,(ChannelMessage)msg.deepclone());
+            if ( processIncoming(order) ) 
processLeftOvers(msg.getAddress(),false);
     
-    public void processLeftOvers(Member member, boolean force) {
+        }finally {
+            inLock.writeLock().unlock();
+        }
+    }
+    protected void processLeftOvers(Member member, boolean force) {
         MessageOrder tmp = (MessageOrder)incoming.get(member);
         if ( force ) {
             Counter cnt = getInCounter(member);
@@ -101,7 +115,7 @@
      * @param order MessageOrder
      * @return boolean - true if a message expired and was processed
      */
-    public boolean processIncoming(MessageOrder order) {
+    protected boolean processIncoming(MessageOrder order) {
         boolean result = false;
         Member member = order.getMessage().getAddress();
         Counter cnt = getInCounter(member);
@@ -147,28 +161,50 @@
         return result;
     }
     
-    public synchronized void memberAdded(Member member) {
+    public void memberAdded(Member member) {
+        //reset counters
+        try {
+            inLock.writeLock().lock();
+            getInCounter(member);
+        }finally {
+            inLock.writeLock().unlock();
+        }
+        try {
+            outLock.writeLock().lock();
+            getOutCounter(member);
+        }finally {
+            outLock.writeLock().unlock();
+        }
         //notify upwards
-        getInCounter(member);
-        getOutCounter(member);
         super.memberAdded(member);
     }
 
-    public synchronized void memberDisappeared(Member member) {
-        //notify upwards
-        outcounter.remove(member);
-        incounter.remove(member);
+    public void memberDisappeared(Member member) {
+        //reset counters
+        try {
+            inLock.writeLock().lock();
+            incounter.remove(member);
+        }finally {
+            inLock.writeLock().unlock();
+        }
+        try {
+            outLock.writeLock().lock();
+            outcounter.remove(member);
+        }finally {
+            outLock.writeLock().unlock();
+        }
         //clear the remaining queue
         processLeftOvers(member,true);
+        //notify upwards
         super.memberDisappeared(member);
     }
     
-    public int incCounter(Member mbr) { 
+    protected int incCounter(Member mbr) { 
         Counter cnt = getOutCounter(mbr);
         return cnt.inc();
     }
     
-    public Counter getInCounter(Member mbr) {
+    protected Counter getInCounter(Member mbr) {
         Counter cnt = (Counter)incounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -178,7 +214,7 @@
         return cnt;
     }
 
-    public Counter getOutCounter(Member mbr) {
+    protected Counter getOutCounter(Member mbr) {
         Counter cnt = (Counter)outcounter.get(mbr);
         if ( cnt == null ) {
             cnt = new Counter();
@@ -187,7 +223,7 @@
         return cnt;
     }
 
-    public static class Counter {
+    protected static class Counter {
         private AtomicInteger value = new AtomicInteger(0);
         
         public int getCounter() {
@@ -203,7 +239,7 @@
         }
     }
     
-    public static class MessageOrder {
+    protected static class MessageOrder {
         private long received = System.currentTimeMillis();
         private MessageOrder next;
         private int msgNr;

Modified: 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
URL: 
http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java?view=diff&rev=538920&r1=538919&r2=538920
==============================================================================
--- 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
 (original)
+++ 
tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java
 Thu May 17 06:45:15 2007
@@ -31,6 +31,7 @@
 import org.apache.catalina.tribes.ChannelMessage;
 import org.apache.catalina.tribes.group.InterceptorPayload;
 import org.apache.catalina.tribes.ChannelException;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class TestOrderInterceptor extends TestCase {
 
@@ -76,8 +77,9 @@
     
     public void testOrder1() throws Exception {
         Member[] dest = channels[0].getMembers();
+        final AtomicInteger value = new AtomicInteger(0);
         for ( int i=0; i<100; i++ ) {
-            channels[0].send(dest,new Integer(i),0);
+            channels[0].send(dest,new Integer(value.getAndAdd(1)),0);
         }
         Thread.sleep(5000);
         for ( int i=0; i<test.length; i++ ) {
@@ -85,6 +87,40 @@
         }
     }
     
+    public void testOrder2() throws Exception {
+        final Member[] dest = channels[0].getMembers();
+        final AtomicInteger value = new AtomicInteger(0);
+        Runnable run = new Runnable() {
+            public void run() {
+                for (int i = 0; i < 100; i++) {
+                    try {
+                        synchronized (channels[0]) {
+                            channels[0].send(dest, new 
Integer(value.getAndAdd(1)), 0);
+                        }
+                    }catch ( Exception x ) {
+                        x.printStackTrace();
+                        assertEquals(true,false);
+                    }
+                }
+            }
+        };
+        Thread[] threads = new Thread[5];
+        for (int i=0;i<threads.length;i++) {
+            threads[i] = new Thread(run);
+        }
+        for (int i=0;i<threads.length;i++) {
+            threads[i].start();
+        }
+        for (int i=0;i<threads.length;i++) {
+            threads[i].join();
+        }
+        Thread.sleep(5000);
+        for ( int i=0; i<test.length; i++ ) {
+            super.assertEquals(false,test[i].fail);
+        }
+    }
+
+
     protected void tearDown() throws Exception {
         System.out.println("tearDown");
         super.tearDown();
@@ -112,7 +148,7 @@
             Integer i = (Integer)msg;
             if ( i.intValue() != cnt ) fail = true;
             else cnt++;
-            System.out.println("Listener["+id+"] Message received:"+i+" 
Count:"+total);
+            System.out.println("Listener["+id+"] Message received:"+i+" 
Count:"+total+" Fail:"+fail);
 
         }
 



---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to