Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java?rev=1831414&r1=1831413&r2=1831414&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/DeltaManager.java Fri May 11 13:15:13 2018 @@ -5,9 +5,9 @@ * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -44,14 +44,14 @@ import org.apache.tomcat.util.res.String * The DeltaManager manages replicated sessions by only replicating the deltas * in data. For applications written to handle this, the DeltaManager is the * optimal way of replicating data. - * + * * This code is almost identical to StandardManager with a difference in how it * persists sessions and some modifications to it. - * + * * <b>IMPLEMENTATION NOTE </b>: Correct behavior of session storing and * reloading depends upon external calls to the <code>start()</code> and * <code>stop()</code> methods of this class at the correct times. - * + * * @author Filip Hanik * @author Craig R. McClanahan * @author Jean-Francois Arcand @@ -79,7 +79,7 @@ public class DeltaManager extends Cluste */ protected static String managerName = "DeltaManager"; protected String name = null; - + private boolean expireSessionsOnShutdown = false; private boolean notifySessionListenersOnReplication = true; private boolean notifyContainerListenersOnReplication = true; @@ -88,19 +88,19 @@ public class DeltaManager extends Cluste private int stateTransferTimeout = 60; private boolean sendAllSessions = true; private int sendAllSessionsSize = 1000 ; - + /** - * wait time between send session block (default 2 sec) + * wait time between send session block (default 2 sec) */ - private int sendAllSessionsWaitTime = 2 * 1000 ; + private int sendAllSessionsWaitTime = 2 * 1000 ; private ArrayList<SessionMessage> receivedMessageQueue = new ArrayList<SessionMessage>() ; private boolean receiverQueue = false ; private boolean stateTimestampDrop = true ; - private long stateTransferCreateSendTime; - + private long stateTransferCreateSendTime; + // ------------------------------------------------------------------ stats attributes - + private long sessionReplaceCounter = 0 ; private long counterReceive_EVT_GET_ALL_SESSIONS = 0 ; private long counterReceive_EVT_ALL_SESSION_DATA = 0 ; @@ -120,7 +120,7 @@ public class DeltaManager extends Cluste private int counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0 ; private long counterSend_EVT_CHANGE_SESSION_ID = 0; private int counterNoStateTransfered = 0 ; - + // ------------------------------------------------------------- Constructor public DeltaManager() { @@ -128,7 +128,7 @@ public class DeltaManager extends Cluste } // ------------------------------------------------------------- Properties - + /** * Return descriptive information about this Manager implementation and the * corresponding version number, in the format @@ -155,14 +155,14 @@ public class DeltaManager extends Cluste public long getCounterSend_EVT_GET_ALL_SESSIONS() { return counterSend_EVT_GET_ALL_SESSIONS; } - + /** * @return Returns the counterSend_EVT_SESSION_ACCESSED. */ public long getCounterSend_EVT_SESSION_ACCESSED() { return counterSend_EVT_SESSION_ACCESSED; } - + /** * @return Returns the counterSend_EVT_SESSION_CREATED. */ @@ -183,7 +183,7 @@ public class DeltaManager extends Cluste public long getCounterSend_EVT_SESSION_EXPIRED() { return counterSend_EVT_SESSION_EXPIRED; } - + /** * @return Returns the counterSend_EVT_ALL_SESSION_DATA. */ @@ -211,43 +211,43 @@ public class DeltaManager extends Cluste public long getCounterReceive_EVT_ALL_SESSION_DATA() { return counterReceive_EVT_ALL_SESSION_DATA; } - + /** * @return Returns the counterReceive_EVT_GET_ALL_SESSIONS. */ public long getCounterReceive_EVT_GET_ALL_SESSIONS() { return counterReceive_EVT_GET_ALL_SESSIONS; } - + /** * @return Returns the counterReceive_EVT_SESSION_ACCESSED. */ public long getCounterReceive_EVT_SESSION_ACCESSED() { return counterReceive_EVT_SESSION_ACCESSED; } - + /** * @return Returns the counterReceive_EVT_SESSION_CREATED. */ public long getCounterReceive_EVT_SESSION_CREATED() { return counterReceive_EVT_SESSION_CREATED; } - + /** * @return Returns the counterReceive_EVT_SESSION_DELTA. */ public long getCounterReceive_EVT_SESSION_DELTA() { return counterReceive_EVT_SESSION_DELTA; } - + /** * @return Returns the counterReceive_EVT_SESSION_EXPIRED. */ public long getCounterReceive_EVT_SESSION_EXPIRED() { return counterReceive_EVT_SESSION_EXPIRED; } - - + + /** * @return Returns the counterReceive_EVT_ALL_SESSION_TRANSFERCOMPLETE. */ @@ -276,25 +276,25 @@ public class DeltaManager extends Cluste public long getProcessingTime() { return processingTime; } - + /** * @return Returns the sessionReplaceCounter. */ public long getSessionReplaceCounter() { return sessionReplaceCounter; } - + /** * @return Returns the counterNoStateTransfered. */ public int getCounterNoStateTransfered() { return counterNoStateTransfered; } - + public int getReceivedQueueSize() { return receivedMessageQueue.size() ; } - + /** * @return Returns the stateTransferTimeout. */ @@ -310,14 +310,14 @@ public class DeltaManager extends Cluste /** * is session state transfered complete? - * + * */ public boolean getStateTransfered() { return stateTransfered; } /** - * set that state ist complete transfered + * set that state ist complete transfered * @param stateTransfered */ public void setStateTransfered(boolean stateTransfered) { @@ -338,72 +338,72 @@ public class DeltaManager extends Cluste public int getSendAllSessionsWaitTime() { return sendAllSessionsWaitTime; } - + /** * @param sendAllSessionsWaitTime The sendAllSessionsWaitTime to set at msec. */ public void setSendAllSessionsWaitTime(int sendAllSessionsWaitTime) { this.sendAllSessionsWaitTime = sendAllSessionsWaitTime; } - + /** * @return Returns the stateTimestampDrop. */ public boolean isStateTimestampDrop() { return stateTimestampDrop; } - + /** * @param isTimestampDrop The new flag value */ public void setStateTimestampDrop(boolean isTimestampDrop) { this.stateTimestampDrop = isTimestampDrop; } - + /** - * + * * @return Returns the sendAllSessions. */ public boolean isSendAllSessions() { return sendAllSessions; } - + /** * @param sendAllSessions The sendAllSessions to set. */ public void setSendAllSessions(boolean sendAllSessions) { this.sendAllSessions = sendAllSessions; } - + /** * @return Returns the sendAllSessionsSize. */ public int getSendAllSessionsSize() { return sendAllSessionsSize; } - + /** * @param sendAllSessionsSize The sendAllSessionsSize to set. */ public void setSendAllSessionsSize(int sendAllSessionsSize) { this.sendAllSessionsSize = sendAllSessionsSize; } - + /** * @return Returns the notifySessionListenersOnReplication. */ public boolean isNotifySessionListenersOnReplication() { return notifySessionListenersOnReplication; } - + /** * @param notifyListenersCreateSessionOnReplication The notifySessionListenersOnReplication to set. */ public void setNotifySessionListenersOnReplication(boolean notifyListenersCreateSessionOnReplication) { this.notifySessionListenersOnReplication = notifyListenersCreateSessionOnReplication; } - - + + public boolean isExpireSessionsOnShutdown() { return expireSessionsOnShutdown; } @@ -411,7 +411,7 @@ public class DeltaManager extends Cluste public void setExpireSessionsOnShutdown(boolean expireSessionsOnShutdown) { this.expireSessionsOnShutdown = expireSessionsOnShutdown; } - + public boolean isNotifyContainerListenersOnReplication() { return notifyContainerListenersOnReplication; } @@ -420,7 +420,7 @@ public class DeltaManager extends Cluste boolean notifyContainerListenersOnReplication) { this.notifyContainerListenersOnReplication = notifyContainerListenersOnReplication; } - + // --------------------------------------------------------- Public Methods @@ -432,7 +432,7 @@ public class DeltaManager extends Cluste /** * Create new session with check maxActiveSessions and send session creation * to other cluster nodes. - * + * * @param distribute * @return The session */ @@ -454,10 +454,10 @@ public class DeltaManager extends Cluste */ protected void sendCreateSession(String sessionId, DeltaSession session) { if(cluster.getMembers().length > 0 ) { - SessionMessage msg = + SessionMessage msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_CREATED, - null, + SessionMessage.EVT_SESSION_CREATED, + null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.sendMessage.newSession",name, sessionId)); @@ -466,7 +466,7 @@ public class DeltaManager extends Cluste send(msg); } } - + /** * Send messages to other backup member (domain or all) * @param msg Session message @@ -486,7 +486,7 @@ public class DeltaManager extends Cluste public Session createEmptySession() { return getNewDeltaSession() ; } - + /** * Get new session class to be used in the doLoad() method. */ @@ -573,7 +573,7 @@ public class DeltaManager extends Cluste /** * serialize DeltaRequest * @see DeltaRequest#writeExternal(java.io.ObjectOutput) - * + * * @param deltaRequest * @return serialized delta request * @throws IOException @@ -661,7 +661,7 @@ public class DeltaManager extends Cluste * Save any currently active sessions in the appropriate persistence * mechanism, if any. If persistence is not supported, this method returns * without doing anything. - * + * * @exception IOException * if an input/output error occurs */ @@ -676,7 +676,7 @@ public class DeltaManager extends Cluste oos = new ObjectOutputStream(new BufferedOutputStream(fos)); oos.writeObject(Integer.valueOf(currentSessions.length)); for(int i=0 ; i < currentSessions.length;i++) { - ((DeltaSession)currentSessions[i]).writeObjectData(oos); + ((DeltaSession)currentSessions[i]).writeObjectData(oos); } // Flush and close the output stream oos.flush(); @@ -733,7 +733,7 @@ public class DeltaManager extends Cluste ExceptionUtils.handleThrowable(t); log.error(sm.getString("deltaManager.managerLoad"), t); } - + setState(LifecycleState.STARTING); } @@ -781,7 +781,7 @@ public class DeltaManager extends Cluste } } } - } + } receivedMessageQueue.clear(); receiverQueue = false ; } @@ -793,7 +793,7 @@ public class DeltaManager extends Cluste /** * Find the master of the session state - * @return master member of sessions + * @return master member of sessions */ protected Member findSessionMasterMember() { Member mbr = null; @@ -861,7 +861,7 @@ public class DeltaManager extends Cluste log.debug(sm.getString("deltaManager.stopped", getName())); setState(LifecycleState.STOPPING); - + // Expire all active sessions if (log.isInfoEnabled()) log.info(sm.getString("deltaManager.expireSessions", getName())); Session sessions[] = findSessions(); @@ -873,7 +873,7 @@ public class DeltaManager extends Cluste session.expire(true, isExpireSessionsOnShutdown()); } catch (Throwable t) { ExceptionUtils.handleThrowable(t); - } + } } // Require a new random number generator if we are restarted @@ -886,7 +886,7 @@ public class DeltaManager extends Cluste /** * A message was received from another node, this is the callback method to * implement if you are interested in receiving replication messages. - * + * * @param cmsg - * the message received. */ @@ -896,8 +896,8 @@ public class DeltaManager extends Cluste SessionMessage msg = (SessionMessage) cmsg; switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: - case SessionMessage.EVT_SESSION_CREATED: - case SessionMessage.EVT_SESSION_EXPIRED: + case SessionMessage.EVT_SESSION_CREATED: + case SessionMessage.EVT_SESSION_EXPIRED: case SessionMessage.EVT_SESSION_ACCESSED: case SessionMessage.EVT_SESSION_DELTA: case SessionMessage.EVT_CHANGE_SESSION_ID: { @@ -914,7 +914,7 @@ public class DeltaManager extends Cluste break; } } //switch - + messageReceived(msg, msg.getAddress() != null ? (Member) msg.getAddress() : null); } } @@ -925,7 +925,7 @@ public class DeltaManager extends Cluste * needed or not. If there is a need for replication, the manager will * create a session message and that will be replicated. The cluster * determines where it gets sent. - * + * * @param sessionId - * the sessionId that just completed. * @return a SessionMessage to be sent, @@ -941,9 +941,9 @@ public class DeltaManager extends Cluste * needed or not. If there is a need for replication, the manager will * create a session message and that will be replicated. The cluster * determines where it gets sent. - * + * * Session expiration also calls this method, but with expires == true. - * + * * @param sessionId - * the sessionId that just completed. * @param expires - @@ -962,16 +962,16 @@ public class DeltaManager extends Cluste } DeltaRequest deltaRequest = session.getDeltaRequest(); session.lock(); - if (deltaRequest.getSize() > 0) { + if (deltaRequest.getSize() > 0) { counterSend_EVT_SESSION_DELTA++; byte[] data = serializeDeltaRequest(session,deltaRequest); msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_DELTA, - data, + SessionMessage.EVT_SESSION_DELTA, + data, sessionId, sessionId + "-" + System.currentTimeMillis()); session.resetDeltaRequest(); - } + } } catch (IOException x) { log.error(sm.getString("deltaManager.createMessage.unableCreateDeltaRequest",sessionId), x); return null; @@ -982,14 +982,14 @@ public class DeltaManager extends Cluste if(!expires && !session.isPrimarySession()) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_ACCESSED, - null, + SessionMessage.EVT_SESSION_ACCESSED, + null, sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.accessChangePrimary",getName(), sessionId)); } - } + } } else { // log only outside synch block! if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.delta",getName(), sessionId)); @@ -999,13 +999,13 @@ public class DeltaManager extends Cluste //check to see if we need to send out an access message if (!expires && (msg == null)) { long replDelta = System.currentTimeMillis() - session.getLastTimeReplicated(); - if (session.getMaxInactiveInterval() >=0 && + if (session.getMaxInactiveInterval() >=0 && replDelta > (session.getMaxInactiveInterval() * 1000L)) { counterSend_EVT_SESSION_ACCESSED++; msg = new SessionMessageImpl(getName(), - SessionMessage.EVT_SESSION_ACCESSED, + SessionMessage.EVT_SESSION_ACCESSED, null, - sessionId, + sessionId, sessionId + "-" + System.currentTimeMillis()); if (log.isDebugEnabled()) { log.debug(sm.getString("deltaManager.createMessage.access", getName(),sessionId)); @@ -1061,14 +1061,14 @@ public class DeltaManager extends Cluste counterSend_EVT_SESSION_EXPIRED = 0 ; counterSend_EVT_ALL_SESSION_TRANSFERCOMPLETE = 0; counterSend_EVT_CHANGE_SESSION_ID = 0; - + } - + // -------------------------------------------------------- expire /** * send session expired to other cluster nodes - * + * * @param id * session id */ @@ -1092,7 +1092,7 @@ public class DeltaManager extends Cluste Session sessions[] = findSessions(); int expireDirect = 0 ; int expireIndirect = 0 ; - + if(log.isDebugEnabled()) log.debug("Start expire all sessions " + getName() + " at " + timeNow + " sessioncount " + sessions.length); for (int i = 0; i < sessions.length; i++) { if (sessions[i] instanceof DeltaSession) { @@ -1109,9 +1109,9 @@ public class DeltaManager extends Cluste }//for long timeEnd = System.currentTimeMillis(); if(log.isDebugEnabled()) log.debug("End expire sessions " + getName() + " expire processingTime " + (timeEnd - timeNow) + " expired direct sessions: " + expireDirect + " expired direct sessions: " + expireIndirect); - + } - + @Override public String[] getInvalidatedSessions() { return new String[0]; @@ -1122,7 +1122,7 @@ public class DeltaManager extends Cluste /** * This method is called by the received thread when a SessionMessage has * been received from one of the other nodes in the cluster. - * + * * @param msg - * the message received * @param sender - @@ -1133,11 +1133,11 @@ public class DeltaManager extends Cluste protected void messageReceived(SessionMessage msg, Member sender) { ClassLoader contextLoader = Thread.currentThread().getContextClassLoader(); try { - + ClassLoader[] loaders = getClassLoaders(); Thread.currentThread().setContextClassLoader(loaders[0]); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.eventType",getName(), msg.getEventTypeString(), sender)); - + switch (msg.getEventType()) { case SessionMessage.EVT_GET_ALL_SESSIONS: { handleGET_ALL_SESSIONS(msg,sender); @@ -1213,14 +1213,22 @@ public class DeltaManager extends Cluste counterReceive_EVT_SESSION_DELTA++; byte[] delta = msg.getSession(); DeltaSession session = (DeltaSession) findSession(msg.getSessionID()); - if (session != null) { - if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.delta",getName(), msg.getSessionID())); + if (session == null) { + if (log.isDebugEnabled()) { + log.debug(sm.getString("deltaManager.receiveMessage.delta.unknown", + getName(), msg.getSessionID())); + } + } else { + if (log.isDebugEnabled()) { + log.debug(sm.getString("deltaManager.receiveMessage.delta", + getName(), msg.getSessionID())); + } try { session.lock(); DeltaRequest dreq = deserializeDeltaRequest(session, delta); dreq.execute(session, isNotifyListenersOnReplication()); session.setPrimarySession(false); - }finally { + } finally { session.unlock(); } } @@ -1267,7 +1275,6 @@ public class DeltaManager extends Cluste counterReceive_EVT_SESSION_CREATED++; if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.receiveMessage.createNewSession",getName(), msg.getSessionID())); DeltaSession session = (DeltaSession) createEmptySession(); - session.setManager(this); session.setValid(true); session.setPrimarySession(false); session.setCreationTime(msg.getTimestamp()); @@ -1275,7 +1282,6 @@ public class DeltaManager extends Cluste session.setMaxInactiveInterval(((Context) getContainer()).getSessionTimeout() * 60, false); session.access(); session.setId(msg.getSessionID(), notifySessionListenersOnReplication); - session.resetDeltaRequest(); session.endAccess(); } @@ -1332,7 +1338,7 @@ public class DeltaManager extends Cluste }//end if }//for }//end if - + SessionMessage newmsg = new SessionMessageImpl(name,SessionMessage.EVT_ALL_SESSION_TRANSFERCOMPLETE, null,"SESSION-STATE-TRANSFERED", "SESSION-STATE-TRANSFERED"+ getName()); newmsg.setTimestamp(findSessionTimestamp); if (log.isDebugEnabled()) log.debug(sm.getString("deltaManager.createMessage.allSessionTransfered",getName())); @@ -1399,7 +1405,7 @@ public class DeltaManager extends Cluste result.stateTransferTimeout = stateTransferTimeout; result.sendAllSessions = sendAllSessions; result.sendAllSessionsSize = sendAllSessionsSize; - result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; + result.sendAllSessionsWaitTime = sendAllSessionsWaitTime ; result.stateTimestampDrop = stateTimestampDrop ; return result; }
Modified: tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/LocalStrings.properties URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/LocalStrings.properties?rev=1831414&r1=1831413&r2=1831414&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/LocalStrings.properties (original) +++ tomcat/tc7.0.x/trunk/java/org/apache/catalina/ha/session/LocalStrings.properties Fri May 11 13:15:13 2018 @@ -39,6 +39,7 @@ deltaManager.expireSessions=Manager [{0} deltaManager.receiveMessage.accessed=Manager [{0}]: received session [{1}] accessed. deltaManager.receiveMessage.createNewSession=Manager [{0}]: received session [{1}] created. deltaManager.receiveMessage.delta=Manager [{0}]: received session [{1}] delta. +deltaManager.receiveMessage.delta.unknown=Manager [{0}]: received unknown session [{1}] delta. deltaManager.receiveMessage.error=Manager [{0}]: Unable to receive message through TCP channel deltaManager.receiveMessage.eventType=Manager [{0}]: Received SessionMessage of type=({1}) from [{2}] deltaManager.receiveMessage.expired=Manager [{0}]: received session [{1}] expired. Modified: tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml URL: http://svn.apache.org/viewvc/tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml?rev=1831414&r1=1831413&r2=1831414&view=diff ============================================================================== --- tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml (original) +++ tomcat/tc7.0.x/trunk/webapps/docs/changelog.xml Fri May 11 13:15:13 2018 @@ -132,6 +132,16 @@ </fix> </changelog> </subsection> + <subsection name="Cluster"> + <changelog> + <fix> + Remove duplicate calls when creating a replicated session to reduce the + time taken to create the session and thereby reduce the chances of a + subsequent session update message being ignored because the session does + not yet exist. (markt) + </fix> + </changelog> + </subsection> <subsection name="Tribes"> <changelog> <fix> --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@tomcat.apache.org For additional commands, e-mail: dev-h...@tomcat.apache.org