Author: rgodfrey
Date: Mon Apr 30 05:24:41 2007
New Revision: 533721
URL: http://svn.apache.org/viewvc?view=rev&rev=533721
Log:
QPID-476 : Remove duplicate map of channelId to session
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Mon Apr 30 05:24:41 2007
@@ -96,7 +96,8 @@
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map _sessions = new LinkedHashMap(); // fixme this is map is
replicated in amqprotocolsession as _channelId2SessionMap
+ private final Map<Integer,AMQSession> _sessions = new
LinkedHashMap<Integer,AMQSession>();
+
private String _clientName;
@@ -508,7 +509,7 @@
AMQSession session =
new AMQSession(AMQConnection.this, channelId,
transacted, acknowledgeMode, prefetchHigh,
prefetchLow);
- _protocolHandler.addSessionByChannel(channelId,
session);
+ //_protocolHandler.addSessionByChannel(channelId,
session);
registerSession(channelId, session);
boolean success = false;
@@ -527,7 +528,6 @@
{
if (!success)
{
-
_protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
}
}
@@ -589,7 +589,6 @@
}
catch (AMQException e)
{
- _protocolHandler.removeSessionByChannel(channelId);
deregisterSession(channelId);
throw new AMQException("Error reopening channel " + channelId + "
after failover: " + e, e);
}
@@ -1136,7 +1135,7 @@
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(),
s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1222,5 +1221,11 @@
public void performConnectionTask(Runnable task)
{
_taskPool.execute(task);
+ }
+
+
+ public AMQSession getSession(int channelId)
+ {
+ return _sessions.get(channelId);
}
}
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
Mon Apr 30 05:24:41 2007
@@ -429,17 +429,7 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry)
- {
- this(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
- MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode,
messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode,
MessageFactoryRegistry messageFactoryRegistry, int
defaultPrefetchHighMark, int defaultPrefetchLowMark)
@@ -493,15 +483,7 @@
}
}
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode)
- {
- this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry());
- }
- AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetch)
- {
- this(con, channelId, transacted, acknowledgeMode,
MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
- }
AMQSession(AMQConnection con, int channelId, boolean transacted, int
acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
{
@@ -796,7 +778,7 @@
amqe = new AMQException("Closing session forcibly", e);
}
_connection.deregisterSession(_channelId);
- closeProducersAndConsumers(amqe);
+ closeProducersAndConsumers(amqe);
}
}
@@ -809,6 +791,7 @@
_closed.set(true);
_connection.deregisterSession(_channelId);
markClosedProducersAndConsumers();
+
}
private void markClosedProducersAndConsumers()
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
Mon Apr 30 05:24:41 2007
@@ -490,27 +490,7 @@
new
SpecificMethodFrameListener(frame.getChannel(), responseClass), timeout);
}
- /**
- * Convenience method to register an AMQSession with the protocol handler.
Registering a session with the protocol
- * handler will ensure that messages are delivered to the consumer(s) on
that session.
- *
- * @param channelId the channel id of the session
- * @param session the session instance.
- */
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- _protocolSession.addSessionByChannel(channelId, session);
- }
- /**
- * Convenience method to deregister an AMQSession with the protocol
handler.
- *
- * @param channelId then channel id of the session
- */
- public void removeSessionByChannel(int channelId)
- {
- _protocolSession.removeSessionByChannel(channelId);
- }
public void closeSession(AMQSession session) throws AMQException
{
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolSession.java
Mon Apr 30 05:24:41 2007
@@ -85,7 +85,7 @@
protected final AMQProtocolHandler _protocolHandler;
/** Maps from the channel id to the AMQSession that it represents. */
- protected ConcurrentMap _channelId2SessionMap = new ConcurrentHashMap();
+ protected ConcurrentMap<Integer, AMQSession> _channelId2SessionMap = new
ConcurrentHashMap<Integer, AMQSession>();
protected ConcurrentMap _closingChannels = new ConcurrentHashMap();
@@ -104,26 +104,13 @@
private VersionSpecificRegistry _registry =
MainRegistry.getVersionSpecificRegistry(ProtocolVersion.getLatestSupportedVersion());
- /**
- * No-arg constructor for use by test subclass - has to initialise final
vars NOT intended for use other then for
- * test
- */
- public AMQProtocolSession()
- {
- _protocolHandler = null;
- _minaProtocolSession = null;
- _stateManager = new AMQStateManager(this);
- }
+ private final AMQConnection _connection;
+
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection)
{
- _protocolHandler = protocolHandler;
- _minaProtocolSession = protocolSession;
- // properties of the connection are made available to the event
handlers
- _minaProtocolSession.setAttribute(AMQ_CONNECTION, connection);
- // fixme - real value needed
- _minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
- _stateManager = new AMQStateManager(this);
+ this(protocolHandler,protocolSession,connection, new
AMQStateManager());
+
}
public AMQProtocolSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection,
@@ -138,6 +125,7 @@
_minaProtocolSession.setWriteTimeout(LAST_WRITE_FUTURE_JOIN_TIMEOUT);
_stateManager = stateManager;
_stateManager.setProtocolSession(this);
+ _connection = connection;
}
@@ -305,11 +293,16 @@
*/
private void deliverMessageToAMQSession(int channelId, UnprocessedMessage
msg)
{
- AMQSession session = (AMQSession) _channelId2SessionMap.get(channelId);
+ AMQSession session = getSession(channelId);
session.messageReceived(msg);
_channelId2UnprocessedMsgMap.remove(channelId);
}
+ protected AMQSession getSession(int channelId)
+ {
+ return _connection.getSession(channelId);
+ }
+
/**
* Convenience method that writes a frame to the protocol session.
Equivalent to calling
* getProtocolSession().write().
@@ -335,32 +328,6 @@
}
}
- public void addSessionByChannel(int channelId, AMQSession session)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to register a session
with a channel id <= zero");
- }
-
- if (session == null)
- {
- throw new IllegalArgumentException("Attempt to register a null
session");
- }
-
- _logger.debug("Add session with channel id " + channelId);
- _channelId2SessionMap.put(channelId, session);
- }
-
- public void removeSessionByChannel(int channelId)
- {
- if (channelId <= 0)
- {
- throw new IllegalArgumentException("Attempt to deregister a
session with a channel id <= zero");
- }
-
- _logger.debug("Removing session with channelId " + channelId);
- _channelId2SessionMap.remove(channelId);
- }
/**
* Starts the process of closing a session
@@ -393,11 +360,11 @@
*/
public boolean channelClosed(int channelId, AMQConstant code, String text)
throws AMQException
{
- final Integer chId = channelId;
+
// if this is not a response to an earlier request to close the channel
- if (_closingChannels.remove(chId) == null)
+ if (_closingChannels.remove(channelId) == null)
{
- final AMQSession session = (AMQSession)
_channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
try
{
session.closed(new AMQException(code, text));
@@ -469,8 +436,7 @@
public void confirmConsumerCancelled(int channelId, AMQShortString
consumerTag)
{
- final Integer chId = channelId;
- final AMQSession session = (AMQSession)
_channelId2SessionMap.get(chId);
+ final AMQSession session = getSession(channelId);
session.confirmConsumerCancelled(consumerTag);
}
Modified:
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
URL:
http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java?view=diff&rev=533721&r1=533720&r2=533721
==============================================================================
---
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
(original)
+++
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/protocol/AMQProtocolSessionTest.java
Mon Apr 30 05:24:41 2007
@@ -32,9 +32,6 @@
{
private static class AMQProtSession extends AMQProtocolSession
{
- public AMQProtSession()
- {
- }
public AMQProtSession(AMQProtocolHandler protocolHandler, IoSession
protocolSession, AMQConnection connection)
{