Author: fhanik Date: Thu May 17 06:45:15 2007 New Revision: 538920 URL: http://svn.apache.org/viewvc?view=rev&rev=538920 Log: Add locks instead of synchronized statements to avoid issues between receiving and sending
Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java Modified: tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java?view=diff&rev=538920&r1=538919&r2=538920 ============================================================================== --- tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java (original) +++ tomcat/tc6.0.x/trunk/java/org/apache/catalina/tribes/group/interceptors/OrderInterceptor.java Thu May 17 06:45:15 2007 @@ -25,6 +25,7 @@ import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.io.XByteBuffer; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -59,36 +60,49 @@ private long expire = 3000; private boolean forwardExpired = true; private int maxQueue = Integer.MAX_VALUE; + + ReentrantReadWriteLock inLock = new ReentrantReadWriteLock(true); + ReentrantReadWriteLock outLock= new ReentrantReadWriteLock(true); - public synchronized void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { + public void sendMessage(Member[] destination, ChannelMessage msg, InterceptorPayload payload) throws ChannelException { if ( !okToProcess(msg.getOptions()) ) { super.sendMessage(destination, msg, payload); return; } - for ( int i=0; i<destination.length; i++ ) { - int nr = incCounter(destination[i]); - //reduce byte copy - msg.getMessage().append(nr); - try { - getNext().sendMessage(new Member[] {destination[i]}, msg, payload); - }finally { - msg.getMessage().trim(4); + try { + outLock.writeLock().lock(); + for ( int i=0; i<destination.length; i++ ) { + int nr = incCounter(destination[i]); + //reduce byte copy + msg.getMessage().append(nr); + try { + getNext().sendMessage(new Member[] {destination[i]}, msg, payload); + }finally { + msg.getMessage().trim(4); + } } + }finally { + outLock.writeLock().unlock(); } } - public synchronized void messageReceived(ChannelMessage msg) { + public void messageReceived(ChannelMessage msg) { if ( !okToProcess(msg.getOptions()) ) { super.messageReceived(msg); return; } - int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); - msg.getMessage().trim(4); - MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone()); - if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); - } + try { + inLock.writeLock().lock(); + int msgnr = XByteBuffer.toInt(msg.getMessage().getBytesDirect(),msg.getMessage().getLength()-4); + msg.getMessage().trim(4); + MessageOrder order = new MessageOrder(msgnr,(ChannelMessage)msg.deepclone()); + if ( processIncoming(order) ) processLeftOvers(msg.getAddress(),false); - public void processLeftOvers(Member member, boolean force) { + }finally { + inLock.writeLock().unlock(); + } + } + protected void processLeftOvers(Member member, boolean force) { MessageOrder tmp = (MessageOrder)incoming.get(member); if ( force ) { Counter cnt = getInCounter(member); @@ -101,7 +115,7 @@ * @param order MessageOrder * @return boolean - true if a message expired and was processed */ - public boolean processIncoming(MessageOrder order) { + protected boolean processIncoming(MessageOrder order) { boolean result = false; Member member = order.getMessage().getAddress(); Counter cnt = getInCounter(member); @@ -147,28 +161,50 @@ return result; } - public synchronized void memberAdded(Member member) { + public void memberAdded(Member member) { + //reset counters + try { + inLock.writeLock().lock(); + getInCounter(member); + }finally { + inLock.writeLock().unlock(); + } + try { + outLock.writeLock().lock(); + getOutCounter(member); + }finally { + outLock.writeLock().unlock(); + } //notify upwards - getInCounter(member); - getOutCounter(member); super.memberAdded(member); } - public synchronized void memberDisappeared(Member member) { - //notify upwards - outcounter.remove(member); - incounter.remove(member); + public void memberDisappeared(Member member) { + //reset counters + try { + inLock.writeLock().lock(); + incounter.remove(member); + }finally { + inLock.writeLock().unlock(); + } + try { + outLock.writeLock().lock(); + outcounter.remove(member); + }finally { + outLock.writeLock().unlock(); + } //clear the remaining queue processLeftOvers(member,true); + //notify upwards super.memberDisappeared(member); } - public int incCounter(Member mbr) { + protected int incCounter(Member mbr) { Counter cnt = getOutCounter(mbr); return cnt.inc(); } - public Counter getInCounter(Member mbr) { + protected Counter getInCounter(Member mbr) { Counter cnt = (Counter)incounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -178,7 +214,7 @@ return cnt; } - public Counter getOutCounter(Member mbr) { + protected Counter getOutCounter(Member mbr) { Counter cnt = (Counter)outcounter.get(mbr); if ( cnt == null ) { cnt = new Counter(); @@ -187,7 +223,7 @@ return cnt; } - public static class Counter { + protected static class Counter { private AtomicInteger value = new AtomicInteger(0); public int getCounter() { @@ -203,7 +239,7 @@ } } - public static class MessageOrder { + protected static class MessageOrder { private long received = System.currentTimeMillis(); private MessageOrder next; private int msgNr; Modified: tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java URL: http://svn.apache.org/viewvc/tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java?view=diff&rev=538920&r1=538919&r2=538920 ============================================================================== --- tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java (original) +++ tomcat/tc6.0.x/trunk/test/org/apache/catalina/tribes/test/interceptors/TestOrderInterceptor.java Thu May 17 06:45:15 2007 @@ -31,6 +31,7 @@ import org.apache.catalina.tribes.ChannelMessage; import org.apache.catalina.tribes.group.InterceptorPayload; import org.apache.catalina.tribes.ChannelException; +import java.util.concurrent.atomic.AtomicInteger; public class TestOrderInterceptor extends TestCase { @@ -76,8 +77,9 @@ public void testOrder1() throws Exception { Member[] dest = channels[0].getMembers(); + final AtomicInteger value = new AtomicInteger(0); for ( int i=0; i<100; i++ ) { - channels[0].send(dest,new Integer(i),0); + channels[0].send(dest,new Integer(value.getAndAdd(1)),0); } Thread.sleep(5000); for ( int i=0; i<test.length; i++ ) { @@ -85,6 +87,40 @@ } } + public void testOrder2() throws Exception { + final Member[] dest = channels[0].getMembers(); + final AtomicInteger value = new AtomicInteger(0); + Runnable run = new Runnable() { + public void run() { + for (int i = 0; i < 100; i++) { + try { + synchronized (channels[0]) { + channels[0].send(dest, new Integer(value.getAndAdd(1)), 0); + } + }catch ( Exception x ) { + x.printStackTrace(); + assertEquals(true,false); + } + } + } + }; + Thread[] threads = new Thread[5]; + for (int i=0;i<threads.length;i++) { + threads[i] = new Thread(run); + } + for (int i=0;i<threads.length;i++) { + threads[i].start(); + } + for (int i=0;i<threads.length;i++) { + threads[i].join(); + } + Thread.sleep(5000); + for ( int i=0; i<test.length; i++ ) { + super.assertEquals(false,test[i].fail); + } + } + + protected void tearDown() throws Exception { System.out.println("tearDown"); super.tearDown(); @@ -112,7 +148,7 @@ Integer i = (Integer)msg; if ( i.intValue() != cnt ) fail = true; else cnt++; - System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total); + System.out.println("Listener["+id+"] Message received:"+i+" Count:"+total+" Fail:"+fail); } --------------------------------------------------------------------- To unsubscribe, e-mail: [EMAIL PROTECTED] For additional commands, e-mail: [EMAIL PROTECTED]