Repository: geode Updated Branches: refs/heads/feature/GEODE-2632 c43cc7aab -> 7a83cccb1 (forced update)
GEODE-2684 Connection & ConnectionTable cleanup removed dead code and indirect access of TcpConduit through the connection table. Project: http://git-wip-us.apache.org/repos/asf/geode/repo Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6b2b7b2f Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6b2b7b2f Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6b2b7b2f Branch: refs/heads/feature/GEODE-2632 Commit: 6b2b7b2f7f3f63b8ae638e9afffa5edc0f763783 Parents: 391502a Author: Bruce Schuchardt <bschucha...@pivotal.io> Authored: Wed Apr 5 15:13:01 2017 -0700 Committer: Bruce Schuchardt <bschucha...@pivotal.io> Committed: Wed Apr 5 15:31:33 2017 -0700 ---------------------------------------------------------------------- .../apache/geode/internal/tcp/Connection.java | 188 ++++++------------- .../geode/internal/tcp/ConnectionTable.java | 68 ++----- .../geode/internal/tcp/DirectReplySender.java | 2 +- .../apache/geode/internal/tcp/MsgReader.java | 2 +- .../apache/geode/internal/tcp/NIOMsgReader.java | 2 +- 5 files changed, 82 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index a0af245..c57a0ba 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -72,8 +72,6 @@ public class Connection implements Runnable { public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical msg public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of chunks public final static int DIRECT_ACK_BIT = 0x20; - // We no longer support early ack - // public final static int EARLY_ACK_BIT = 0x10; public static final int MSG_HEADER_SIZE_OFFSET = 0; public static final int MSG_HEADER_TYPE_OFFSET = 4; @@ -95,7 +93,9 @@ public class Connection implements Runnable { "member unexpectedly shut down shared, unordered connection"; /** the table holding this connection */ - final ConnectionTable owner; + private final ConnectionTable owner; + + private final TCPConduit conduit; /** * Set to false once run() is terminating. Using this instead of Thread.isAlive as the reader @@ -113,15 +113,6 @@ public class Connection implements Runnable { /** The idle timeout timer task for this connection */ private SystemTimerTask idleTask; - /** - * Returns the depth of unshared reader threads from this thread to the original - * non-reader-thread. E.g., ServerConnection -> reader(domino=1) -> reader(domino=2) -> - * reader(domino=3) - */ - public static int getDominoCount() { - return dominoCount.get().intValue(); - } - private final static ThreadLocal isReaderThread = new ThreadLocal(); public final static void makeReaderThread() { @@ -129,7 +120,7 @@ public class Connection implements Runnable { makeReaderThread(true); } - private final static void makeReaderThread(boolean v) { + private static void makeReaderThread(boolean v) { isReaderThread.set(v); } @@ -150,7 +141,7 @@ public class Connection implements Runnable { if (connectTimeoutStr != null) { P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr); } else { - P2P_CONNECT_TIMEOUT = 6 * this.owner.owner.getDM().getConfig().getMemberTimeout(); + P2P_CONNECT_TIMEOUT = 6 * this.conduit.getDM().getConfig().getMemberTimeout(); } IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true; return P2P_CONNECT_TIMEOUT; @@ -367,20 +358,18 @@ public class Connection implements Runnable { /** the buffer used for NIO message receipt */ ByteBuffer nioInputBuffer; - /** the position of the next message's content */ - // int nioMessageStart; - /** the length of the next message to be dispatched */ int nioMessageLength; - // byte nioMessageVersion; /** the type of message being received */ byte nioMessageType; /** used to lock access to destreamer data */ private final Object destreamerLock = new Object(); + /** caches a msg destreamer that is currently not being used */ MsgDestreamer idleMsgDestreamer; + /** * used to map a msgId to a MsgDestreamer which are used for destreaming chunked messages using * nio @@ -409,8 +398,6 @@ public class Connection implements Runnable { private int sendBufferSize = -1; private int recvBufferSize = -1; - private ReplySender replySender; - private void setSendBufferSize(Socket sock) { setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize); } @@ -541,6 +528,7 @@ public class Connection implements Runnable { throw new IllegalArgumentException( LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString()); } + this.conduit = t.getConduit(); this.isReceiver = true; this.owner = t; this.socket = socket; @@ -628,7 +616,7 @@ public class Connection implements Runnable { bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff); bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff); bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type - bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff); + bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff); bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff); bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK; int allocSize = bytes.length; @@ -707,19 +695,16 @@ public class Connection implements Runnable { my_okHandshakeBytes = okHandshakeBytes; } if (useNIO()) { + assert my_okHandshakeBuf != null; synchronized (my_okHandshakeBuf) { my_okHandshakeBuf.position(0); nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, null); } } else { synchronized (outLock) { - try { - // this.writerThread = Thread.currentThread(); - this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length); - this.output.flush(); - } finally { - // this.writerThread = null; - } + assert my_okHandshakeBytes != null; + this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length); + this.output.flush(); } } } @@ -832,7 +817,7 @@ public class Connection implements Runnable { /** * asynchronously close this connection * - * @param beingSick + * @param beingSick test hook to simulate sickness in communications & membership */ private void asyncClose(boolean beingSick) { // note: remoteAddr may be null if this is a receiver that hasn't finished its handshake @@ -890,8 +875,7 @@ public class Connection implements Runnable { InternalDistributedMember myAddr = this.owner.getConduit().getMemberId(); final MsgOutputStream connectHandshake = new MsgOutputStream(CONNECT_HANDSHAKE_SIZE); - // connectHandshake.reset(); - /** + /* * Note a byte of zero is always written because old products serialized a member id with always * sends an ip address. My reading of the ip-address specs indicated that the first byte of a * valid address would never be 0. @@ -925,8 +909,6 @@ public class Connection implements Runnable { private void handshakeStream() throws IOException { waitForAddressCompletion(); - // this.output = new BufferedOutputStream(getSocket().getOutputStream(), - // owner.getConduit().bufferSize); this.output = getSocket().getOutputStream(); ByteArrayOutputStream baos = new ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE); DataOutputStream os = new DataOutputStream(baos); @@ -961,17 +943,12 @@ public class Connection implements Runnable { lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff); lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff); lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; - lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 0xff); + lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 0xff); lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff); synchronized (outLock) { - try { - // this.writerThread = Thread.currentThread(); - this.output.write(lenbytes, 0, lenbytes.length); - this.output.write(msg, 0, msg.length); - this.output.flush(); - } finally { - // this.writerThread = null; - } + this.output.write(lenbytes, 0, lenbytes.length); + this.output.write(msg, 0, msg.length); + this.output.flush(); } } @@ -1091,7 +1068,7 @@ public class Connection implements Runnable { // create connection try { conn = null; - conn = new Connection(mgr, t, preserveOrder, remoteAddr, sharedResource); + conn = new Connection(t, preserveOrder, remoteAddr, sharedResource); } catch (javax.net.ssl.SSLHandshakeException se) { // no need to retry if certificates were rejected throw se; @@ -1206,7 +1183,7 @@ public class Connection implements Runnable { private void setRemoteAddr(DistributedMember m) { this.remoteAddr = this.owner.getDM().getCanonicalId(m); - MembershipManager mgr = this.owner.owner.getMembershipManager(); + MembershipManager mgr = this.conduit.getMembershipManager(); mgr.addSurpriseMember(m); } @@ -1214,9 +1191,8 @@ public class Connection implements Runnable { * creates a new connection to a remote server. We are initiating this connection; the other side * must accept us We will almost always send messages; small acks are received. */ - private Connection(MembershipManager mgr, ConnectionTable t, boolean preserveOrder, - DistributedMember remoteID, boolean sharedResource) - throws IOException, DistributedSystemDisconnectedException { + private Connection(ConnectionTable t, boolean preserveOrder, DistributedMember remoteID, + boolean sharedResource) throws IOException, DistributedSystemDisconnectedException { // initialize a socket upfront. So that the InternalDistributedMember remoteAddr = (InternalDistributedMember) remoteID; @@ -1224,6 +1200,7 @@ public class Connection implements Runnable { throw new IllegalArgumentException( LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString()); } + this.conduit = t.getConduit(); this.isReceiver = false; this.owner = t; this.sharedResource = sharedResource; @@ -1248,7 +1225,7 @@ public class Connection implements Runnable { channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE); - /** + /* * If conserve-sockets is false, the socket can be used for receiving responses, so set the * receive buffer accordingly. */ @@ -1261,7 +1238,7 @@ public class Connection implements Runnable { setSendBufferSize(channel.socket()); channel.configureBlocking(true); - int connectTime = getP2PConnectTimeout();; + int connectTime = getP2PConnectTimeout(); try { channel.socket().connect(addr, connectTime); @@ -1276,7 +1253,7 @@ public class Connection implements Runnable { Thread.currentThread().interrupt(); } throw c; - } catch (CancelledKeyException e) { + } catch (CancelledKeyException | ClosedSelectorException e) { // bug #44469: for some reason NIO throws this runtime exception // instead of an IOException on timeouts ConnectException c = new ConnectException( @@ -1284,14 +1261,6 @@ public class Connection implements Runnable { .toLocalizedString(new Object[] {connectTime})); c.initCause(e); throw c; - } catch (ClosedSelectorException e) { - // bug #44808: for some reason JRockit NIO thorws this runtime exception - // instead of an IOException on timeouts - ConnectException c = new ConnectException( - LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS - .toLocalizedString(new Object[] {connectTime})); - c.initCause(e); - throw c; } } finally { this.owner.removeConnectingSocket(channel.socket()); @@ -1309,7 +1278,6 @@ public class Connection implements Runnable { setSocketBufferSize(this.socket, false, socketBufferSize, true); setSendBufferSize(this.socket); } else { - // socket = new Socket(remoteAddr.getInetAddress(), remoteAddr.getPort()); Socket s = new Socket(); this.socket = s; s.setTcpNoDelay(true); @@ -1335,13 +1303,12 @@ public class Connection implements Runnable { * must not be doing it correctly. */ private static final boolean BATCH_SENDS = Boolean.getBoolean("p2p.batchSends"); - protected static final int BATCH_BUFFER_SIZE = + private static final int BATCH_BUFFER_SIZE = Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue(); - protected static final int BATCH_FLUSH_MS = - Integer.getInteger("p2p.batchFlushTime", 50).intValue(); - protected Object batchLock; - protected ByteBuffer fillBatchBuffer; - protected ByteBuffer sendBatchBuffer; + private static final int BATCH_FLUSH_MS = Integer.getInteger("p2p.batchFlushTime", 50).intValue(); + private Object batchLock; + private ByteBuffer fillBatchBuffer; + private ByteBuffer sendBatchBuffer; private BatchBufferFlusher batchFlusher; private void createBatchSendBuffer() { @@ -1446,13 +1413,7 @@ public class Connection implements Runnable { SocketChannel channel = getSocket().getChannel(); nioWriteFully(channel, sendBatchBuffer, false, null); sendBatchBuffer.clear(); - } catch (IOException ex) { - logger.fatal(LocalizedMessage.create( - LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex)); - readerShuttingDown = true; - requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0 - .toLocalizedString(ex)); - } catch (ConnectionException ex) { + } catch (IOException | ConnectionException ex) { logger.fatal(LocalizedMessage.create( LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex)); readerShuttingDown = true; @@ -1526,13 +1487,6 @@ public class Connection implements Runnable { return this.closing.get(); } - /** - * Used to close a connection that has not yet been registered with the distribution manager. - */ - void closePartialConnect(String reason) { - close(reason, false, false, false, false); - } - void closePartialConnect(String reason, boolean beingSick) { close(reason, false, false, beingSick, false); } @@ -1619,9 +1573,9 @@ public class Connection implements Runnable { // if network partition detection is enabled or this is an admin vm // we can't wait for the reader thread when running in an IBM JRE. See // bug 41889 - if (this.owner.owner.config.getEnableNetworkPartitionDetection() - || this.owner.owner.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE - || this.owner.owner.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { + if (this.conduit.config.getEnableNetworkPartitionDetection() + || this.conduit.getMemberId().getVmKind() == DistributionManager.ADMIN_ONLY_DM_TYPE + || this.conduit.getMemberId().getVmKind() == DistributionManager.LOCATOR_DM_TYPE) { isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor")); } { @@ -1673,6 +1627,7 @@ public class Connection implements Runnable { } } } else { + // noinspection ConstantConditions this.owner.removeSharedConnection(reason, this.remoteAddr, this.preserveOrder, this); } } else if (!this.isReceiver) { @@ -1735,7 +1690,7 @@ public class Connection implements Runnable { initiateSuspicionIfSharedUnordered(); if (this.isReceiver) { if (!this.sharedResource) { - this.owner.owner.stats.incThreadOwnedReceivers(-1L, dominoCount.get()); + this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get()); } asyncClose(false); this.owner.removeAndCloseThreadOwnedSockets(); @@ -1759,7 +1714,7 @@ public class Connection implements Runnable { } private String p2pReaderName() { - StringBuffer sb = new StringBuffer(64); + StringBuilder sb = new StringBuilder(64); if (this.isReceiver) { sb.append("P2P message reader@"); } else { @@ -1973,8 +1928,8 @@ public class Connection implements Runnable { } msg = msg.toLowerCase(); - return (msg.indexOf("forcibly closed") >= 0) || (msg.indexOf("reset by peer") >= 0) - || (msg.indexOf("connection reset") >= 0); + return (msg.contains("forcibly closed")) || (msg.contains("reset by peer")) + || (msg.contains("connection reset")); } private static boolean validMsgType(int msgType) { @@ -2012,7 +1967,7 @@ public class Connection implements Runnable { this.idleMsgDestreamer = null; } else { result = new MsgDestreamer(this.owner.getConduit().stats, - this.owner.owner.getCancelCriterion(), v); + this.conduit.getCancelCriterion(), v); } result.setName(p2pReaderName() + " msgId=" + msgId); this.destreamerMap.put(key, result); @@ -2103,7 +2058,7 @@ public class Connection implements Runnable { /* byte msgHdrVersion = */ calcHdrVersion(len); len = calcMsgByteSize(len); int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET]; - short msgId = (short) ((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff * 0x100) + short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8) + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff)); boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0; if (myDirectAck) { @@ -2384,7 +2339,7 @@ public class Connection implements Runnable { // logger.fine("thread-owned receiver with domino count of " + dominoNumber + " // will prefer shared sockets"); } - this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber); + this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber); } if (logger.isDebugEnabled()) { @@ -2541,11 +2496,6 @@ public class Connection implements Runnable { } bytesSoFar += bytesThisTime; } catch (InterruptedIOException io) { - // try { Thread.sleep(10); } - // catch (InterruptedException ie) { - // Thread.currentThread().interrupt(); - // } - // Current thread has been interrupted. Regard it similar to an EOF this.readerShuttingDown = true; try { @@ -2582,7 +2532,7 @@ public class Connection implements Runnable { final boolean origSocketInUse = this.socketInUse; byte originalState = -1; synchronized (stateLock) { - originalState = this.connectionState;; + originalState = this.connectionState; this.connectionState = STATE_SENDING; } this.socketInUse = true; @@ -2597,13 +2547,8 @@ public class Connection implements Runnable { } else { byte[] bytesToWrite = getBytesToWrite(buffer); synchronized (outLock) { - try { - // this.writerThread = Thread.currentThread(); - this.output.write(bytesToWrite); - this.output.flush(); - } finally { - // this.writerThread = null; - } + this.output.write(bytesToWrite); + this.output.flush(); } } } @@ -2763,15 +2708,7 @@ public class Connection implements Runnable { return bytesToWrite; } - // private String socketInfo() { - // return (" socket: " + getSocket().getLocalAddress() + ":" + getSocket().getLocalPort() + " -> " - // + - // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection = " + - // System.identityHashCode(this)); - // - // } - - private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force) + private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, boolean force) throws ConnectionException { final DMStats stats = this.owner.getConduit().stats; long start = DistributionStats.getStatTime(); @@ -2891,7 +2828,7 @@ public class Connection implements Runnable { * * @throws ConnectionException if the conduit has stopped */ - private final boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg) + private boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage msg) throws ConnectionException { if (!addToQueue(buffer, msg, true)) { return false; @@ -2931,7 +2868,7 @@ public class Connection implements Runnable { this.pusherThread.start(); } - private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException { + private ByteBuffer takeFromOutgoingQueue() throws InterruptedException { ByteBuffer result = null; final DMStats stats = this.owner.getConduit().stats; long start = DistributionStats.getStatTime(); @@ -3152,7 +3089,7 @@ public class Connection implements Runnable { * Return false if socket writes to be done async/nonblocking Return true if socket writes to be * done sync/blocking */ - private final boolean useSyncWrites(boolean forceAsync) { + private boolean useSyncWrites(boolean forceAsync) { if (forceAsync) { return false; } @@ -3185,7 +3122,7 @@ public class Connection implements Runnable { static private final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 2) - private final void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync, + private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean forceAsync, DistributionMessage p_msg, final DMStats stats) throws IOException { DistributionMessage msg = p_msg; // async/non-blocking @@ -3301,7 +3238,7 @@ public class Connection implements Runnable { if (msToWait <= 0) { Thread.yield(); } else { - boolean interrupted = Thread.interrupted();; + boolean interrupted = Thread.interrupted(); try { Thread.sleep(msToWait); } catch (InterruptedException ex) { @@ -3401,10 +3338,10 @@ public class Connection implements Runnable { /** * stateLock is used to synchronize state changes. */ - protected Object stateLock = new Object(); + private final Object stateLock = new Object(); /** for timeout processing, this is the current state of the connection */ - protected byte connectionState = STATE_IDLE; + private byte connectionState = STATE_IDLE; /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */ /** the connection is idle, but may be in use */ @@ -3420,16 +3357,11 @@ public class Connection implements Runnable { /** the connection is in use and is reading a message */ protected static final byte STATE_READING = 5; - protected static final String[] STATE_NAMES = - new String[] {"idle", "sending", "post_sending", "reading_ack", "received_ack", "reading"}; /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */ /** set to true if we exceeded the ack-wait-threshold waiting for a response */ protected volatile boolean ackTimedOut; - private static int ACK_SIZE = 1; - private static byte ACK_BYTE = 37; - /** * @param msToWait number of milliseconds to wait for an ack. If 0 then wait forever. * @param msInterval interval between checks @@ -3899,7 +3831,7 @@ public class Connection implements Runnable { // } else { // ConnectionTable.threadWantsSharedResources(); } - this.owner.owner.stats.incThreadOwnedReceivers(1L, dominoNumber); + this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber); // Because this thread is not shared resource, it will be used for direct // ack. Direct ack messages can be large. This call will resize the send // buffer. @@ -4039,6 +3971,10 @@ public class Connection implements Runnable { } } + protected TCPConduit getConduit() { + return this.conduit; + } + protected Socket getSocket() throws SocketException { // fix for bug 37286 Socket result = this.socket; @@ -4177,7 +4113,7 @@ public class Connection implements Runnable { boolean nioChecked; boolean useNIO; - private final boolean useNIO() { + private boolean useNIO() { if (TCPConduit.useSSL) { return false; } @@ -4193,7 +4129,7 @@ public class Connection implements Runnable { if (this.socket != null && (this.socket.getInetAddress() instanceof Inet6Address)) { String os = System.getProperty("os.name"); if (os != null) { - if (os.indexOf("Windows") != -1) { + if (os.contains("Windows")) { this.useNIO = false; } } http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java index 08a9009..c55af82 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java @@ -64,17 +64,9 @@ import org.apache.geode.internal.logging.log4j.LocalizedMessage; * * @since GemFire 2.1 */ -/* - * Note: We no longer use InputMultiplexer If InputMux is reinstated then the manager needs to be - * initialized and all lines that have a NOMUX preface should be uncommented - * - */ public class ConnectionTable { private static final Logger logger = LogService.getLogger(); - /** a random number generator for secondary connection selection */ - // static java.util.Random random = new java.util.Random(); - /** warning when descriptor limit reached */ private static boolean ulimitWarningIssued; @@ -82,6 +74,7 @@ public class ConnectionTable { * true if the current thread wants non-shared resources */ private static ThreadLocal threadWantsOwnResources = new ThreadLocal(); + /** * Used for messages whose order must be preserved Only connections used for sending messages, and * receiving acks, will be put in this map. @@ -132,9 +125,7 @@ public class ConnectionTable { /** * the conduit for this table */ - protected final TCPConduit owner; - // ARB: temp making this protected to provide access to Connection. - // private final TCPConduit owner; + private final TCPConduit owner; /** * true if this table is no longer in use @@ -205,17 +196,10 @@ public class ConnectionTable { return (Boolean) threadWantsOwnResources.get(); } - // public static void setThreadOwnsResourcesRegistration( - // Boolean newValue) { - // threadWantsOwnResources.set(newValue); - // } - // private Map connections = new HashMap(); - /* NOMUX: private InputMuxManager inputMuxManager; */ - // private int lowWater; - // private int highWater; + public TCPConduit getOwner() { + return owner; + } - // private static boolean TRACK_SERVER_CONNECTIONS = - // System.getProperty("p2p.bidirectional", "true").equals("true"); private ConnectionTable(TCPConduit c) throws IOException { this.owner = c; @@ -226,10 +210,6 @@ public class ConnectionTable { this.threadConnectionMap = new ConcurrentHashMap(); this.p2pReaderThreadPool = createThreadPoolForIO(c.getDM().getSystem().isShareSockets()); this.socketCloser = new SocketCloser(); - /* - * NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new InputMuxManager(this); - * inputMuxManager.start(c.logger); } - */ } private Executor createThreadPoolForIO(boolean conserveSockets) { @@ -306,7 +286,6 @@ public class ConnectionTable { } } - // Stub id = conn.getRemoteId(); if (conn != null) { synchronized (this.receivers) { this.owner.stats.incReceivers(); @@ -322,22 +301,9 @@ public class ConnectionTable { conn.remoteAddr); } } - // cleanupHighWater(); } - // /** returns the connection associated with the given key, or null if - // no such connection exists */ - // protected Connection basicGet(Serializable id) { - // synchronized (this.orderedConnectionMap) { - // return (Connection) this.orderedConnectionMap.get(id); - // } - // } - - // protected Connection get(Serializable id) throws java.io.IOException { - // return get(id, false); - // } - /** * Process a newly created PendingConnection @@ -432,10 +398,10 @@ public class ConnectionTable { } /** - * unordered or conserve-sockets note that unordered connections are currently always shared + * unordered or conserve-sockets=true note that unordered connections are currently always shared * * @param id the DistributedMember on which we are creating a connection - * @param threadOwnsResources whether unordered conn is owned by the current thread + * @param scheduleTimeout whether unordered connection should time out * @param preserveOrder whether to preserve order * @param startTime the ms clock start time for the operation * @param ackTimeout the ms ack-wait-threshold, or zero @@ -444,9 +410,9 @@ public class ConnectionTable { * @throws IOException if unable to create the connection * @throws DistributedSystemDisconnectedException */ - private Connection getUnorderedOrConserveSockets(DistributedMember id, - boolean threadOwnsResources, boolean preserveOrder, long startTime, long ackTimeout, - long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { + private Connection getSharedConnection(DistributedMember id, boolean scheduleTimeout, + boolean preserveOrder, long startTime, long ackTimeout, long ackSATimeout) + throws IOException, DistributedSystemDisconnectedException { Connection result = null; final Map m = preserveOrder ? this.orderedConnectionMap : this.unorderedConnectionMap; @@ -472,7 +438,7 @@ public class ConnectionTable { if (pc != null) { result = handleNewPendingConnection(id, true /* fixes bug 43386 */, preserveOrder, m, pc, startTime, ackTimeout, ackSATimeout); - if (!preserveOrder && threadOwnsResources) { + if (!preserveOrder && scheduleTimeout) { scheduleIdleTimeout(result); } } else { // we have existing connection @@ -487,10 +453,10 @@ public class ConnectionTable { startTime, ackTimeout, ackSATimeout); if (logger.isDebugEnabled()) { if (result != null) { - logger.debug("getUnorderedOrConserveSockets {} myAddr={} theirAddr={}", result, + logger.debug("getSharedConnection {} myAddr={} theirAddr={}", result, getConduit().getMemberId(), result.remoteAddr); } else { - logger.debug("getUnorderedOrConserveSockets: Connect failed"); + logger.debug("getSharedConnection: Connect failed"); } } } else { @@ -512,7 +478,7 @@ public class ConnectionTable { * @throws IOException if the connection could not be created * @throws DistributedSystemDisconnectedException */ - Connection getOrderedAndOwned(DistributedMember id, long startTime, long ackTimeout, + Connection getThreadOwnedConnection(DistributedMember id, long startTime, long ackTimeout, long ackSATimeout) throws IOException, DistributedSystemDisconnectedException { Connection result = null; @@ -658,10 +624,10 @@ public class ConnectionTable { Connection result = null; boolean threadOwnsResources = threadOwnsResources(); if (!preserveOrder || !threadOwnsResources) { - result = getUnorderedOrConserveSockets(id, threadOwnsResources, preserveOrder, startTime, - ackTimeout, ackSATimeout); + result = getSharedConnection(id, threadOwnsResources, preserveOrder, startTime, ackTimeout, + ackSATimeout); } else { - result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout); + result = getThreadOwnedConnection(id, startTime, ackTimeout, ackSATimeout); } if (result != null) { Assert.assertTrue(result.preserveOrder == preserveOrder); http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java index 3872ee9..bf06953 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java @@ -56,7 +56,7 @@ class DirectReplySender implements ReplySender { // mutates the list when it has exceptions. // fix for bug #42199 - cancellation check - this.conn.owner.getDM().getCancelCriterion().checkCancelInProgress(null); + this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null); if (logger.isTraceEnabled(LogMarker.DM)) { logger.trace(LogMarker.DM, "Sending a direct reply {} to {}", msg, conn.getRemoteAddress()); http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java index fc56271..be1f533 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java @@ -94,7 +94,7 @@ public abstract class MsgReader { public abstract ByteBuffer readAtLeast(int bytes) throws IOException; protected DMStats getStats() { - return conn.owner.getConduit().stats; + return conn.getConduit().stats; } public static class Header { http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java ---------------------------------------------------------------------- diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java index 50f5fae..a4e35a4 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java @@ -79,7 +79,7 @@ public class NIOMsgReader extends MsgReader { if (nioInputBuffer == null) { int allocSize = conn.getReceiveBufferSize(); if (allocSize == -1) { - allocSize = conn.owner.getConduit().tcpBufferSize; + allocSize = conn.getConduit().tcpBufferSize; } if (allocSize > bufferSize) { bufferSize = allocSize;