fhanik      2004/01/12 20:22:28

  Modified:    modules/cluster/src/share/org/apache/catalina/cluster/session
                        DeltaManager.java DeltaRequest.java
                        DeltaSession.java
               modules/cluster/src/share/org/apache/catalina/cluster/tcp
                        PooledSocketSender.java ReplicationTransmitter.java
                        SimpleTcpCluster.java
  Log:
  Fixed a bug in a dead lock with the pooled socket sender when a member crashes
  Recycling the delta request objects to avoid object instantiation, although I 
actually think this is slower
  Fixed the call back with the session and the delta request execution
  
  Revision  Changes    Path
  1.5       +9 -10     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java
  
  Index: DeltaManager.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaManager.java,v
  retrieving revision 1.4
  retrieving revision 1.5
  diff -u -r1.4 -r1.5
  --- DeltaManager.java 13 Jan 2004 00:07:18 -0000      1.4
  +++ DeltaManager.java 13 Jan 2004 04:22:28 -0000      1.5
  @@ -406,24 +406,23 @@
       }
   
   
  -    private DeltaRequest loadDeltaRequest(byte[] data) throws
  +    private DeltaRequest loadDeltaRequest(DeltaSession session, byte[] data) throws
           ClassNotFoundException, IOException {
           ByteArrayInputStream fis = null;
           ReplicationStream ois = null;
           Loader loader = null;
           ClassLoader classLoader = null;
           fis = new ByteArrayInputStream(data);
  -        BufferedInputStream bis = new BufferedInputStream(fis);
           ois = new ReplicationStream(fis,container.getLoader().getClassLoader());
  -        DeltaRequest dreq = (DeltaRequest)ois.readObject();
  +        session.getDeltaRequest().readExternal(ois);
           ois.close();
  -        return dreq;
  +        return session.getDeltaRequest();
       }
       
       private byte[] unloadDeltaRequest(DeltaRequest deltaRequest) throws IOException 
{
           ByteArrayOutputStream bos = new ByteArrayOutputStream();
           ObjectOutputStream oos = new ObjectOutputStream(bos);
  -        oos.writeObject(deltaRequest);
  +        deltaRequest.writeExternal(oos);
           oos.flush();
           oos.close();
           return bos.toByteArray();
  @@ -874,8 +873,8 @@
                      }
                      case SessionMessage.EVT_SESSION_DELTA : {
                          byte[] delta = msg.getSession();
  -                       DeltaRequest dreq = loadDeltaRequest(delta);
                          DeltaSession session = 
(DeltaSession)findSession(msg.getSessionID());
  +                       DeltaRequest dreq = loadDeltaRequest(session,delta);
                          dreq.execute(session);
                          session.setPrimarySession(false);
                          
  
  
  
  1.4       +71 -23    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java
  
  Index: DeltaRequest.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaRequest.java,v
  retrieving revision 1.3
  retrieving revision 1.4
  diff -u -r1.3 -r1.4
  --- DeltaRequest.java 13 Jan 2004 00:07:18 -0000      1.3
  +++ DeltaRequest.java 13 Jan 2004 04:22:28 -0000      1.4
  @@ -66,7 +66,9 @@
   
   /**
    * This class is used to track the series of actions that happens when
  - * a request is executed. These actions will then
  + * a request is executed. These actions will then translate into invokations of 
methods 
  + * on the actual session.
  + * This class is NOT thread safe. One DeltaRequest per session
    * @author <a href="mailto:[EMAIL PROTECTED]">Filip Hanik</a>
    * @version 1.0
    */
  @@ -95,6 +97,8 @@
   
       private String sessionId;
       private LinkedList actions = new LinkedList();
  +    private LinkedList actionPool = new LinkedList();
  +    
       private boolean recordAllActions = false;
   
       public DeltaRequest() {
  @@ -140,7 +144,13 @@
                                int action,
                                String name,
                                Object value) {
  -        AttributeInfo info = new AttributeInfo(type,action,name,value);
  +        AttributeInfo info = null;
  +        if ( this.actionPool.size() > 0 ) {
  +            info = (AttributeInfo)actionPool.removeFirst();
  +            info.init(type,action,name,value);
  +        } else {
  +            info = new AttributeInfo(type, action, name, value);
  +        }
           //if we have already done something to this attribute, make sure
           //we don't send multiple actions across the wire
           if ( !recordAllActions) actions.remove(info);
  @@ -148,25 +158,25 @@
           actions.addLast(info);
       }
   
  -    public void execute(ClusterSession session) {
  +    public void execute(DeltaSession session) {
           if ( !this.sessionId.equals( session.getId() ) )
               throw new java.lang.IllegalArgumentException("Session id mismatch, not 
executing the delta request");
           for ( int i=0; i<actions.size(); i++ ) {
               AttributeInfo info = (AttributeInfo)actions.get(i);
               switch ( info.getType() ) {
                   case TYPE_ATTRIBUTE: {
  -                    if ( info.getAction() == ACTION_SET )
  -                        session.setAttribute(info.getName(),info.getValue());
  -                    else
  -                        session.removeAttribute(info.getName());
  +                    if ( info.getAction() == ACTION_SET ) {
  +                        session.setAttribute(info.getName(), info.getValue(),false);
  +                    }  else
  +                        session.removeAttribute(info.getName(),true,false);
                       break;
                   }//case
                   case TYPE_ISNEW: {
  -                    session.setNew(((Boolean)info.getValue()).booleanValue());
  +                    session.setNew(((Boolean)info.getValue()).booleanValue(),false);
                       break;
                   }//case
                   case TYPE_MAXINTERVAL: {
  -                    
session.setMaxInactiveInterval(((Integer)info.getValue()).intValue());
  +                    
session.setMaxInactiveInterval(((Integer)info.getValue()).intValue(),false);
                       break;
                   }//case
                   case TYPE_PRINCIPAL: {
  @@ -175,7 +185,7 @@
                           SerializablePrincipal sp = 
(SerializablePrincipal)info.getValue();
                           p = 
(Principal)sp.getPrincipal(session.getManager().getContainer().getRealm());
                       }
  -                    session.setPrincipal(p);
  +                    session.setPrincipal(p,false);
                       break;
                   }//case
                   default : throw new java.lang.IllegalArgumentException("Invalid 
attribute info type="+info);
  @@ -184,8 +194,14 @@
       }
   
       public void reset() {
  +        while ( actions.size() > 0 ) {
  +            AttributeInfo info = (AttributeInfo)actions.removeFirst();
  +            info.recycle();
  +            actionPool.addLast(info);
  +        }
           actions.clear();
       }
  +    
       public String getSessionId() {
           return sessionId;
       }
  @@ -199,21 +215,29 @@
           return actions.size();
       }
       
  -    public void readExternal(java.io.ObjectInput in ) throws java.io.IOException,
  -    java.lang.ClassNotFoundException {
  -    //sessionId - String
  -    //recordAll - boolean
  -    //size - int
  -    //AttributeInfo - in an array
  +    public void readExternal(java.io.ObjectInput in) throws java.io.IOException,
  +        java.lang.ClassNotFoundException {
  +        //sessionId - String
  +        //recordAll - boolean
  +        //size - int
  +        //AttributeInfo - in an array
  +        reset();
           sessionId = in.readUTF();
           recordAllActions = in.readBoolean();
           int cnt = in.readInt();
  -        if ( actions == null )
  +        if (actions == null)
               actions = new LinkedList();
           else
               actions.clear();
           for (int i = 0; i < cnt; i++) {
  -            AttributeInfo info = (AttributeInfo)in.readObject();
  +            AttributeInfo info = null;
  +            if (this.actionPool.size() > 0) {
  +                info = (AttributeInfo) actionPool.removeFirst();
  +            }
  +            else {
  +                info = new AttributeInfo(-1,-1,null,null);
  +            }
  +            info.readExternal(in);
               actions.addLast(info);
           }//for
       }
  @@ -230,7 +254,7 @@
           out.writeInt(getSize());
           for ( int i=0; i<getSize(); i++ ) {
               AttributeInfo info = (AttributeInfo)actions.get(i);
  -            out.writeObject(info);
  +            info.writeExternal(out);
           }
       }
   
  @@ -239,11 +263,21 @@
           private Object value = null;
           private int action;
           private int type;
  +
           public AttributeInfo() {}
  +
           public AttributeInfo(int type,
                                int action,
                                String name,
                                Object value) {
  +            super();
  +            init(type,action,name,value);
  +        }
  +
  +        public void init(int type,
  +                         int action,
  +                         String name,
  +                         Object value) {
               this.name = name;
               this.value = value;
               this.action = action;
  @@ -268,6 +302,13 @@
           public String getName() {
               return name;
           }
  +        
  +        public void recycle() {
  +            name = null;
  +            value = null;
  +            type=-1;
  +            action=-1;
  +        }
   
           public boolean equals(Object o) {
               if ( ! (o instanceof AttributeInfo ) ) return false;
  @@ -298,7 +339,14 @@
               out.writeUTF(getName());
               out.writeObject(getValue());
           }
  -
  +        
  +        public String toString() {
  +            StringBuffer buf = new StringBuffer("AttributeInfo[type=");
  +            buf.append(getType()).append(", action=").append(getAction());
  +            buf.append(", name=").append(getName()).append(", 
value=").append(getValue());
  +            buf.append(", addr=").append(super.toString()).append("]");
  +            return buf.toString();
  +        }
   
       }
   
  
  
  
  1.7       +30 -16    
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java
  
  Index: DeltaSession.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/session/DeltaSession.java,v
  retrieving revision 1.6
  retrieving revision 1.7
  diff -u -r1.6 -r1.7
  --- DeltaSession.java 12 Jan 2004 07:50:06 -0000      1.6
  +++ DeltaSession.java 13 Jan 2004 04:22:28 -0000      1.7
  @@ -535,12 +535,15 @@
        * @param interval The new maximum interval
        */
       public void setMaxInactiveInterval(int interval) {
  +        setMaxInactiveInterval(interval,true);
  +    }
  +    public void setMaxInactiveInterval(int interval, boolean addDeltaRequest) {
   
           this.maxInactiveInterval = interval;
           if (isValid && interval == 0) {
               expire();
           } else {
  -            deltaRequest.setMaxInactiveInterval(interval);
  +            if ( addDeltaRequest ) deltaRequest.setMaxInactiveInterval(interval);
           }
   
       }
  @@ -552,8 +555,11 @@
        * @param isNew The new value for the <code>isNew</code> flag
        */
       public void setNew(boolean isNew) {
  +        setNew(isNew,true);
  +    }
  +    public void setNew(boolean isNew, boolean addDeltaRequest) {
           this.isNew = isNew;
  -        deltaRequest.setNew(isNew);
  +        if (addDeltaRequest) deltaRequest.setNew(isNew);
       }
   
   
  @@ -580,11 +586,13 @@
        * @param principal The new Principal, or <code>null</code> if none
        */
       public void setPrincipal(Principal principal) {
  -
  +        setPrincipal(principal,true);
  +    }
  +    public void setPrincipal(Principal principal,boolean addDeltaRequest) {
           Principal oldPrincipal = this.principal;
           this.principal = principal;
           support.firePropertyChange("principal", oldPrincipal, this.principal);
  -        deltaRequest.setPrincipal(principal);
  +        if (addDeltaRequest) deltaRequest.setPrincipal(principal);
       }
   
   
  @@ -919,6 +927,11 @@
               deltaRequest.setSessionId(getId());
           }
       }
  +    
  +    public DeltaRequest getDeltaRequest() {
  +        if ( deltaRequest == null ) resetDeltaRequest();
  +        return deltaRequest;
  +    }
   
   
       // ------------------------------------------------- HttpSession Properties
  @@ -1162,6 +1175,10 @@
        *  invalidated session
        */
       public void removeAttribute(String name, boolean notify) {
  +        removeAttribute(name,notify,true);
  +    }
  +    
  +    public void removeAttribute(String name, boolean notify, boolean 
addDeltaRequest) {
   
           // Validate our current state
           if (!isValid())
  @@ -1181,7 +1198,7 @@
               }
           }
           
  -        deltaRequest.removeAttribute(name);
  +        if (addDeltaRequest) deltaRequest.removeAttribute(name);
   
           // Do we need to do valueUnbound() and attributeRemoved() notification?
           if (!notify) {
  @@ -1271,6 +1288,9 @@
        *  invalidated session
        */
       public void setAttribute(String name, Object value) {
  +        setAttribute(name,value,true);
  +    }
  +    public void setAttribute(String name, Object value, boolean addDeltaRequest) {
   
           // Name cannot be null
           if (name == null)
  @@ -1287,7 +1307,7 @@
               throw new IllegalArgumentException("Attribute ["+name+"] is not 
serializable");
           }
           
  -        deltaRequest.setAttribute(name,value);
  +        if (addDeltaRequest) deltaRequest.setAttribute(name,value);
   
           // Validate our current state
           if (!isValid())
  @@ -1603,12 +1623,6 @@
           }
   
       }
  -
  -
  -    public DeltaRequest getDeltaRequest() {
  -        return deltaRequest;
  -    }
  -
   
   }
   
  
  
  
  1.2       +16 -5     
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java
  
  Index: PooledSocketSender.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/PooledSocketSender.java,v
  retrieving revision 1.1
  retrieving revision 1.2
  diff -u -r1.1 -r1.2
  --- PooledSocketSender.java   9 Jan 2004 23:24:09 -0000       1.1
  +++ PooledSocketSender.java   13 Jan 2004 04:22:28 -0000      1.2
  @@ -117,6 +117,7 @@
       public void connect() throws java.io.IOException
       {
           //do nothing, happens in the socket sender itself
  +        senderQueue.open();
       }
   
       public void disconnect()
  @@ -194,6 +195,7 @@
           private LinkedList queue = new LinkedList();
           private LinkedList inuse = new LinkedList();
           private Object mutex = new Object();
  +        private boolean isOpen = true;
   
           public SenderQueue(PooledSocketSender parent, int limit) {
               this.limit = limit;
  @@ -206,7 +208,7 @@
               long delta = 0;
               do {
                   synchronized (mutex) {
  -
  +                    if ( !isOpen ) throw new IllegalStateException("Socket pool is 
closed.");
                       if ( queue.size() > 0 ) {
                           sender = (SocketSender) queue.removeFirst();
                       } else if ( inuse.size() < limit ) {
  @@ -223,7 +225,7 @@
                       }
                   }//synchronized
                   delta = System.currentTimeMillis() - start;
  -            } while ( (sender == null) && (timeout==0?true:(delta<timeout)) );
  +            } while ( (isOpen) && (sender == null) && 
(timeout==0?true:(delta<timeout)) );
               //to do
               return sender;
           }
  @@ -259,6 +261,15 @@
                   }//for
                   queue.clear();
                   inuse.clear();
  +                isOpen = false;
  +                mutex.notifyAll();
  +            }
  +        }
  +        
  +        public void open() {
  +            synchronized (mutex) {
  +                isOpen = true;
  +                mutex.notifyAll();
               }
           }
       }
  
  
  
  1.12      +4 -4      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java
  
  Index: ReplicationTransmitter.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/ReplicationTransmitter.java,v
  retrieving revision 1.11
  retrieving revision 1.12
  diff -u -r1.11 -r1.12
  --- ReplicationTransmitter.java       20 Dec 2003 00:48:52 -0000      1.11
  +++ ReplicationTransmitter.java       13 Jan 2004 04:22:28 -0000      1.12
  @@ -101,7 +101,7 @@
           String key = addr.getHostAddress()+":"+port;
           IDataSender sender = (IDataSender)map.get(key);
           if ( sender == null ) return;
  -        if ( sender.isConnected() ) sender.disconnect();
  +        sender.disconnect();
           map.remove(key);
       }
   
  
  
  
  1.26      +8 -6      
jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java
  
  Index: SimpleTcpCluster.java
  ===================================================================
  RCS file: 
/home/cvs/jakarta-tomcat-catalina/modules/cluster/src/share/org/apache/catalina/cluster/tcp/SimpleTcpCluster.java,v
  retrieving revision 1.25
  retrieving revision 1.26
  diff -u -r1.25 -r1.26
  --- SimpleTcpCluster.java     13 Jan 2004 00:07:18 -0000      1.25
  +++ SimpleTcpCluster.java     13 Jan 2004 04:22:28 -0000      1.26
  @@ -699,8 +699,10 @@
                           ClusterManager mgr = (ClusterManager) managers.get(key);
                           if (mgr != null)
                               mgr.messageDataReceived(msg);
  -                        else
  -                            log.warn("Context manager doesn't exist:" + key);
  +                        else {
  +                            //this happens a lot before the system has started up
  +                            log.debug("Context manager doesn't exist:" + key);
  +                        }
                       }//while
                   } else {
                       ClusterManager mgr = (ClusterManager) managers.get(name);
  
  
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [EMAIL PROTECTED]
For additional commands, e-mail: [EMAIL PROTECTED]

Reply via email to