Repository: geode
Updated Branches:
  refs/heads/feature/GEODE-2632 c43cc7aab -> 7a83cccb1 (forced update)


GEODE-2684 Connection & ConnectionTable cleanup

removed dead code and indirect access of TcpConduit through the
connection table.


Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/6b2b7b2f
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/6b2b7b2f
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/6b2b7b2f

Branch: refs/heads/feature/GEODE-2632
Commit: 6b2b7b2f7f3f63b8ae638e9afffa5edc0f763783
Parents: 391502a
Author: Bruce Schuchardt <bschucha...@pivotal.io>
Authored: Wed Apr 5 15:13:01 2017 -0700
Committer: Bruce Schuchardt <bschucha...@pivotal.io>
Committed: Wed Apr 5 15:31:33 2017 -0700

----------------------------------------------------------------------
 .../apache/geode/internal/tcp/Connection.java   | 188 ++++++-------------
 .../geode/internal/tcp/ConnectionTable.java     |  68 ++-----
 .../geode/internal/tcp/DirectReplySender.java   |   2 +-
 .../apache/geode/internal/tcp/MsgReader.java    |   2 +-
 .../apache/geode/internal/tcp/NIOMsgReader.java |   2 +-
 5 files changed, 82 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index a0af245..c57a0ba 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -72,8 +72,6 @@ public class Connection implements Runnable {
   public final static int CHUNKED_MSG_TYPE = 0x4d; // a chunk of one logical 
msg
   public final static int END_CHUNKED_MSG_TYPE = 0x4e; // last in a series of 
chunks
   public final static int DIRECT_ACK_BIT = 0x20;
-  // We no longer support early ack
-  // public final static int EARLY_ACK_BIT = 0x10;
 
   public static final int MSG_HEADER_SIZE_OFFSET = 0;
   public static final int MSG_HEADER_TYPE_OFFSET = 4;
@@ -95,7 +93,9 @@ public class Connection implements Runnable {
       "member unexpectedly shut down shared, unordered connection";
 
   /** the table holding this connection */
-  final ConnectionTable owner;
+  private final ConnectionTable owner;
+
+  private final TCPConduit conduit;
 
   /**
    * Set to false once run() is terminating. Using this instead of 
Thread.isAlive as the reader
@@ -113,15 +113,6 @@ public class Connection implements Runnable {
   /** The idle timeout timer task for this connection */
   private SystemTimerTask idleTask;
 
-  /**
-   * Returns the depth of unshared reader threads from this thread to the 
original
-   * non-reader-thread. E.g., ServerConnection -> reader(domino=1) -> 
reader(domino=2) ->
-   * reader(domino=3)
-   */
-  public static int getDominoCount() {
-    return dominoCount.get().intValue();
-  }
-
   private final static ThreadLocal isReaderThread = new ThreadLocal();
 
   public final static void makeReaderThread() {
@@ -129,7 +120,7 @@ public class Connection implements Runnable {
     makeReaderThread(true);
   }
 
-  private final static void makeReaderThread(boolean v) {
+  private static void makeReaderThread(boolean v) {
     isReaderThread.set(v);
   }
 
@@ -150,7 +141,7 @@ public class Connection implements Runnable {
     if (connectTimeoutStr != null) {
       P2P_CONNECT_TIMEOUT = Integer.parseInt(connectTimeoutStr);
     } else {
-      P2P_CONNECT_TIMEOUT = 6 * 
this.owner.owner.getDM().getConfig().getMemberTimeout();
+      P2P_CONNECT_TIMEOUT = 6 * 
this.conduit.getDM().getConfig().getMemberTimeout();
     }
     IS_P2P_CONNECT_TIMEOUT_INITIALIZED = true;
     return P2P_CONNECT_TIMEOUT;
@@ -367,20 +358,18 @@ public class Connection implements Runnable {
   /** the buffer used for NIO message receipt */
   ByteBuffer nioInputBuffer;
 
-  /** the position of the next message's content */
-  // int nioMessageStart;
-
   /** the length of the next message to be dispatched */
   int nioMessageLength;
-  // byte nioMessageVersion;
 
   /** the type of message being received */
   byte nioMessageType;
 
   /** used to lock access to destreamer data */
   private final Object destreamerLock = new Object();
+
   /** caches a msg destreamer that is currently not being used */
   MsgDestreamer idleMsgDestreamer;
+
   /**
    * used to map a msgId to a MsgDestreamer which are used for destreaming 
chunked messages using
    * nio
@@ -409,8 +398,6 @@ public class Connection implements Runnable {
   private int sendBufferSize = -1;
   private int recvBufferSize = -1;
 
-  private ReplySender replySender;
-
   private void setSendBufferSize(Socket sock) {
     setSendBufferSize(sock, this.owner.getConduit().tcpBufferSize);
   }
@@ -541,6 +528,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           
LocalizedStrings.Connection_NULL_CONNECTIONTABLE.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = true;
     this.owner = t;
     this.socket = socket;
@@ -628,7 +616,7 @@ public class Connection implements Runnable {
     bytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((msglen / 0x100) & 0xff);
     bytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (msglen & 0xff);
     bytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE; // message type
-    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 0x100) & 
0xff);
+    bytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 
0xff);
     bytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 0xff);
     bytes[MSG_HEADER_BYTES] = REPLY_CODE_OK;
     int allocSize = bytes.length;
@@ -707,19 +695,16 @@ public class Connection implements Runnable {
       my_okHandshakeBytes = okHandshakeBytes;
     }
     if (useNIO()) {
+      assert my_okHandshakeBuf != null;
       synchronized (my_okHandshakeBuf) {
         my_okHandshakeBuf.position(0);
         nioWriteFully(getSocket().getChannel(), my_okHandshakeBuf, false, 
null);
       }
     } else {
       synchronized (outLock) {
-        try {
-          // this.writerThread = Thread.currentThread();
-          this.output.write(my_okHandshakeBytes, 0, 
my_okHandshakeBytes.length);
-          this.output.flush();
-        } finally {
-          // this.writerThread = null;
-        }
+        assert my_okHandshakeBytes != null;
+        this.output.write(my_okHandshakeBytes, 0, my_okHandshakeBytes.length);
+        this.output.flush();
       }
     }
   }
@@ -832,7 +817,7 @@ public class Connection implements Runnable {
   /**
    * asynchronously close this connection
    * 
-   * @param beingSick
+   * @param beingSick test hook to simulate sickness in communications & 
membership
    */
   private void asyncClose(boolean beingSick) {
     // note: remoteAddr may be null if this is a receiver that hasn't finished 
its handshake
@@ -890,8 +875,7 @@ public class Connection implements Runnable {
 
     InternalDistributedMember myAddr = this.owner.getConduit().getMemberId();
     final MsgOutputStream connectHandshake = new 
MsgOutputStream(CONNECT_HANDSHAKE_SIZE);
-    // connectHandshake.reset();
-    /**
+    /*
      * Note a byte of zero is always written because old products serialized a 
member id with always
      * sends an ip address. My reading of the ip-address specs indicated that 
the first byte of a
      * valid address would never be 0.
@@ -925,8 +909,6 @@ public class Connection implements Runnable {
   private void handshakeStream() throws IOException {
     waitForAddressCompletion();
 
-    // this.output = new BufferedOutputStream(getSocket().getOutputStream(),
-    // owner.getConduit().bufferSize);
     this.output = getSocket().getOutputStream();
     ByteArrayOutputStream baos = new 
ByteArrayOutputStream(CONNECT_HANDSHAKE_SIZE);
     DataOutputStream os = new DataOutputStream(baos);
@@ -961,17 +943,12 @@ public class Connection implements Runnable {
     lenbytes[MSG_HEADER_SIZE_OFFSET + 2] = (byte) ((len / 0x100) & 0xff);
     lenbytes[MSG_HEADER_SIZE_OFFSET + 3] = (byte) (len & 0xff);
     lenbytes[MSG_HEADER_TYPE_OFFSET] = (byte) NORMAL_MSG_TYPE;
-    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID / 
0x100) & 0xff);
+    lenbytes[MSG_HEADER_ID_OFFSET] = (byte) ((MsgIdGenerator.NO_MSG_ID >> 8) & 
0xff);
     lenbytes[MSG_HEADER_ID_OFFSET + 1] = (byte) (MsgIdGenerator.NO_MSG_ID & 
0xff);
     synchronized (outLock) {
-      try {
-        // this.writerThread = Thread.currentThread();
-        this.output.write(lenbytes, 0, lenbytes.length);
-        this.output.write(msg, 0, msg.length);
-        this.output.flush();
-      } finally {
-        // this.writerThread = null;
-      }
+      this.output.write(lenbytes, 0, lenbytes.length);
+      this.output.write(msg, 0, msg.length);
+      this.output.flush();
     }
   }
 
@@ -1091,7 +1068,7 @@ public class Connection implements Runnable {
         // create connection
         try {
           conn = null;
-          conn = new Connection(mgr, t, preserveOrder, remoteAddr, 
sharedResource);
+          conn = new Connection(t, preserveOrder, remoteAddr, sharedResource);
         } catch (javax.net.ssl.SSLHandshakeException se) {
           // no need to retry if certificates were rejected
           throw se;
@@ -1206,7 +1183,7 @@ public class Connection implements Runnable {
 
   private void setRemoteAddr(DistributedMember m) {
     this.remoteAddr = this.owner.getDM().getCanonicalId(m);
-    MembershipManager mgr = this.owner.owner.getMembershipManager();
+    MembershipManager mgr = this.conduit.getMembershipManager();
     mgr.addSurpriseMember(m);
   }
 
@@ -1214,9 +1191,8 @@ public class Connection implements Runnable {
    * creates a new connection to a remote server. We are initiating this 
connection; the other side
    * must accept us We will almost always send messages; small acks are 
received.
    */
-  private Connection(MembershipManager mgr, ConnectionTable t, boolean 
preserveOrder,
-      DistributedMember remoteID, boolean sharedResource)
-      throws IOException, DistributedSystemDisconnectedException {
+  private Connection(ConnectionTable t, boolean preserveOrder, 
DistributedMember remoteID,
+      boolean sharedResource) throws IOException, 
DistributedSystemDisconnectedException {
 
     // initialize a socket upfront. So that the
     InternalDistributedMember remoteAddr = (InternalDistributedMember) 
remoteID;
@@ -1224,6 +1200,7 @@ public class Connection implements Runnable {
       throw new IllegalArgumentException(
           
LocalizedStrings.Connection_CONNECTIONTABLE_IS_NULL.toLocalizedString());
     }
+    this.conduit = t.getConduit();
     this.isReceiver = false;
     this.owner = t;
     this.sharedResource = sharedResource;
@@ -1248,7 +1225,7 @@ public class Connection implements Runnable {
 
         channel.socket().setKeepAlive(SocketCreator.ENABLE_TCP_KEEP_ALIVE);
 
-        /**
+        /*
          * If conserve-sockets is false, the socket can be used for receiving 
responses, so set the
          * receive buffer accordingly.
          */
@@ -1261,7 +1238,7 @@ public class Connection implements Runnable {
         setSendBufferSize(channel.socket());
         channel.configureBlocking(true);
 
-        int connectTime = getP2PConnectTimeout();;
+        int connectTime = getP2PConnectTimeout();
 
         try {
           channel.socket().connect(addr, connectTime);
@@ -1276,7 +1253,7 @@ public class Connection implements Runnable {
             Thread.currentThread().interrupt();
           }
           throw c;
-        } catch (CancelledKeyException e) {
+        } catch (CancelledKeyException | ClosedSelectorException e) {
           // bug #44469: for some reason NIO throws this runtime exception
           // instead of an IOException on timeouts
           ConnectException c = new ConnectException(
@@ -1284,14 +1261,6 @@ public class Connection implements Runnable {
                   .toLocalizedString(new Object[] {connectTime}));
           c.initCause(e);
           throw c;
-        } catch (ClosedSelectorException e) {
-          // bug #44808: for some reason JRockit NIO thorws this runtime 
exception
-          // instead of an IOException on timeouts
-          ConnectException c = new ConnectException(
-              
LocalizedStrings.Connection_ATTEMPT_TO_CONNECT_TIMED_OUT_AFTER_0_MILLISECONDS
-                  .toLocalizedString(new Object[] {connectTime}));
-          c.initCause(e);
-          throw c;
         }
       } finally {
         this.owner.removeConnectingSocket(channel.socket());
@@ -1309,7 +1278,6 @@ public class Connection implements Runnable {
         setSocketBufferSize(this.socket, false, socketBufferSize, true);
         setSendBufferSize(this.socket);
       } else {
-        // socket = new Socket(remoteAddr.getInetAddress(), 
remoteAddr.getPort());
         Socket s = new Socket();
         this.socket = s;
         s.setTcpNoDelay(true);
@@ -1335,13 +1303,12 @@ public class Connection implements Runnable {
    * must not be doing it correctly.
    */
   private static final boolean BATCH_SENDS = 
Boolean.getBoolean("p2p.batchSends");
-  protected static final int BATCH_BUFFER_SIZE =
+  private static final int BATCH_BUFFER_SIZE =
       Integer.getInteger("p2p.batchBufferSize", 1024 * 1024).intValue();
-  protected static final int BATCH_FLUSH_MS =
-      Integer.getInteger("p2p.batchFlushTime", 50).intValue();
-  protected Object batchLock;
-  protected ByteBuffer fillBatchBuffer;
-  protected ByteBuffer sendBatchBuffer;
+  private static final int BATCH_FLUSH_MS = 
Integer.getInteger("p2p.batchFlushTime", 50).intValue();
+  private Object batchLock;
+  private ByteBuffer fillBatchBuffer;
+  private ByteBuffer sendBatchBuffer;
   private BatchBufferFlusher batchFlusher;
 
   private void createBatchSendBuffer() {
@@ -1446,13 +1413,7 @@ public class Connection implements Runnable {
                   SocketChannel channel = getSocket().getChannel();
                   nioWriteFully(channel, sendBatchBuffer, false, null);
                   sendBatchBuffer.clear();
-                } catch (IOException ex) {
-                  logger.fatal(LocalizedMessage.create(
-                      
LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
-                  readerShuttingDown = true;
-                  
requestClose(LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0
-                      .toLocalizedString(ex));
-                } catch (ConnectionException ex) {
+                } catch (IOException | ConnectionException ex) {
                   logger.fatal(LocalizedMessage.create(
                       
LocalizedStrings.Connection_EXCEPTION_FLUSHING_BATCH_SEND_BUFFER_0, ex));
                   readerShuttingDown = true;
@@ -1526,13 +1487,6 @@ public class Connection implements Runnable {
     return this.closing.get();
   }
 
-  /**
-   * Used to close a connection that has not yet been registered with the 
distribution manager.
-   */
-  void closePartialConnect(String reason) {
-    close(reason, false, false, false, false);
-  }
-
   void closePartialConnect(String reason, boolean beingSick) {
     close(reason, false, false, beingSick, false);
   }
@@ -1619,9 +1573,9 @@ public class Connection implements Runnable {
       // if network partition detection is enabled or this is an admin vm
       // we can't wait for the reader thread when running in an IBM JRE. See
       // bug 41889
-      if (this.owner.owner.config.getEnableNetworkPartitionDetection()
-          || this.owner.owner.getMemberId().getVmKind() == 
DistributionManager.ADMIN_ONLY_DM_TYPE
-          || this.owner.owner.getMemberId().getVmKind() == 
DistributionManager.LOCATOR_DM_TYPE) {
+      if (this.conduit.config.getEnableNetworkPartitionDetection()
+          || this.conduit.getMemberId().getVmKind() == 
DistributionManager.ADMIN_ONLY_DM_TYPE
+          || this.conduit.getMemberId().getVmKind() == 
DistributionManager.LOCATOR_DM_TYPE) {
         isIBM = "IBM Corporation".equals(System.getProperty("java.vm.vendor"));
       }
       {
@@ -1673,6 +1627,7 @@ public class Connection implements Runnable {
               }
             }
           } else {
+            // noinspection ConstantConditions
             this.owner.removeSharedConnection(reason, this.remoteAddr, 
this.preserveOrder, this);
           }
         } else if (!this.isReceiver) {
@@ -1735,7 +1690,7 @@ public class Connection implements Runnable {
       initiateSuspicionIfSharedUnordered();
       if (this.isReceiver) {
         if (!this.sharedResource) {
-          this.owner.owner.stats.incThreadOwnedReceivers(-1L, 
dominoCount.get());
+          this.conduit.stats.incThreadOwnedReceivers(-1L, dominoCount.get());
         }
         asyncClose(false);
         this.owner.removeAndCloseThreadOwnedSockets();
@@ -1759,7 +1714,7 @@ public class Connection implements Runnable {
   }
 
   private String p2pReaderName() {
-    StringBuffer sb = new StringBuffer(64);
+    StringBuilder sb = new StringBuilder(64);
     if (this.isReceiver) {
       sb.append("P2P message reader@");
     } else {
@@ -1973,8 +1928,8 @@ public class Connection implements Runnable {
     }
 
     msg = msg.toLowerCase();
-    return (msg.indexOf("forcibly closed") >= 0) || (msg.indexOf("reset by 
peer") >= 0)
-        || (msg.indexOf("connection reset") >= 0);
+    return (msg.contains("forcibly closed")) || (msg.contains("reset by peer"))
+        || (msg.contains("connection reset"));
   }
 
   private static boolean validMsgType(int msgType) {
@@ -2012,7 +1967,7 @@ public class Connection implements Runnable {
           this.idleMsgDestreamer = null;
         } else {
           result = new MsgDestreamer(this.owner.getConduit().stats,
-              this.owner.owner.getCancelCriterion(), v);
+              this.conduit.getCancelCriterion(), v);
         }
         result.setName(p2pReaderName() + " msgId=" + msgId);
         this.destreamerMap.put(key, result);
@@ -2103,7 +2058,7 @@ public class Connection implements Runnable {
         /* byte msgHdrVersion = */ calcHdrVersion(len);
         len = calcMsgByteSize(len);
         int msgType = lenbytes[MSG_HEADER_TYPE_OFFSET];
-        short msgId = (short) ((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff * 0x100)
+        short msgId = (short) (((lenbytes[MSG_HEADER_ID_OFFSET] & 0xff) << 8)
             + (lenbytes[MSG_HEADER_ID_OFFSET + 1] & 0xff));
         boolean myDirectAck = (msgType & DIRECT_ACK_BIT) != 0;
         if (myDirectAck) {
@@ -2384,7 +2339,7 @@ public class Connection implements Runnable {
                   // logger.fine("thread-owned receiver with domino count of " 
+ dominoNumber + "
                   // will prefer shared sockets");
                 }
-                this.owner.owner.stats.incThreadOwnedReceivers(1L, 
dominoNumber);
+                this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
               }
 
               if (logger.isDebugEnabled()) {
@@ -2541,11 +2496,6 @@ public class Connection implements Runnable {
         }
         bytesSoFar += bytesThisTime;
       } catch (InterruptedIOException io) {
-        // try { Thread.sleep(10); }
-        // catch (InterruptedException ie) {
-        // Thread.currentThread().interrupt();
-        // }
-
         // Current thread has been interrupted. Regard it similar to an EOF
         this.readerShuttingDown = true;
         try {
@@ -2582,7 +2532,7 @@ public class Connection implements Runnable {
     final boolean origSocketInUse = this.socketInUse;
     byte originalState = -1;
     synchronized (stateLock) {
-      originalState = this.connectionState;;
+      originalState = this.connectionState;
       this.connectionState = STATE_SENDING;
     }
     this.socketInUse = true;
@@ -2597,13 +2547,8 @@ public class Connection implements Runnable {
         } else {
           byte[] bytesToWrite = getBytesToWrite(buffer);
           synchronized (outLock) {
-            try {
-              // this.writerThread = Thread.currentThread();
-              this.output.write(bytesToWrite);
-              this.output.flush();
-            } finally {
-              // this.writerThread = null;
-            }
+            this.output.write(bytesToWrite);
+            this.output.flush();
           }
         }
       }
@@ -2763,15 +2708,7 @@ public class Connection implements Runnable {
     return bytesToWrite;
   }
 
-  // private String socketInfo() {
-  // return (" socket: " + getSocket().getLocalAddress() + ":" + 
getSocket().getLocalPort() + " -> "
-  // +
-  // getSocket().getInetAddress() + ":" + getSocket().getPort() + " connection 
= " +
-  // System.identityHashCode(this));
-  //
-  // }
-
-  private final boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, 
boolean force)
+  private boolean addToQueue(ByteBuffer buffer, DistributionMessage msg, 
boolean force)
       throws ConnectionException {
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -2891,7 +2828,7 @@ public class Connection implements Runnable {
    * 
    * @throws ConnectionException if the conduit has stopped
    */
-  private final boolean handleBlockedWrite(ByteBuffer buffer, 
DistributionMessage msg)
+  private boolean handleBlockedWrite(ByteBuffer buffer, DistributionMessage 
msg)
       throws ConnectionException {
     if (!addToQueue(buffer, msg, true)) {
       return false;
@@ -2931,7 +2868,7 @@ public class Connection implements Runnable {
     this.pusherThread.start();
   }
 
-  private final ByteBuffer takeFromOutgoingQueue() throws InterruptedException 
{
+  private ByteBuffer takeFromOutgoingQueue() throws InterruptedException {
     ByteBuffer result = null;
     final DMStats stats = this.owner.getConduit().stats;
     long start = DistributionStats.getStatTime();
@@ -3152,7 +3089,7 @@ public class Connection implements Runnable {
    * Return false if socket writes to be done async/nonblocking Return true if 
socket writes to be
    * done sync/blocking
    */
-  private final boolean useSyncWrites(boolean forceAsync) {
+  private boolean useSyncWrites(boolean forceAsync) {
     if (forceAsync) {
       return false;
     }
@@ -3185,7 +3122,7 @@ public class Connection implements Runnable {
 
   static private final int MAX_WAIT_TIME = (1 << 5); // ms (must be a power of 
2)
 
-  private final void writeAsync(SocketChannel channel, ByteBuffer buffer, 
boolean forceAsync,
+  private void writeAsync(SocketChannel channel, ByteBuffer buffer, boolean 
forceAsync,
       DistributionMessage p_msg, final DMStats stats) throws IOException {
     DistributionMessage msg = p_msg;
     // async/non-blocking
@@ -3301,7 +3238,7 @@ public class Connection implements Runnable {
                 if (msToWait <= 0) {
                   Thread.yield();
                 } else {
-                  boolean interrupted = Thread.interrupted();;
+                  boolean interrupted = Thread.interrupted();
                   try {
                     Thread.sleep(msToWait);
                   } catch (InterruptedException ex) {
@@ -3401,10 +3338,10 @@ public class Connection implements Runnable {
   /**
    * stateLock is used to synchronize state changes.
    */
-  protected Object stateLock = new Object();
+  private final Object stateLock = new Object();
 
   /** for timeout processing, this is the current state of the connection */
-  protected byte connectionState = STATE_IDLE;
+  private byte connectionState = STATE_IDLE;
 
   /* ~~~~~~~~~~~~~ connection states ~~~~~~~~~~~~~~~ */
   /** the connection is idle, but may be in use */
@@ -3420,16 +3357,11 @@ public class Connection implements Runnable {
   /** the connection is in use and is reading a message */
   protected static final byte STATE_READING = 5;
 
-  protected static final String[] STATE_NAMES =
-      new String[] {"idle", "sending", "post_sending", "reading_ack", 
"received_ack", "reading"};
   /* ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ */
 
   /** set to true if we exceeded the ack-wait-threshold waiting for a response 
*/
   protected volatile boolean ackTimedOut;
 
-  private static int ACK_SIZE = 1;
-  private static byte ACK_BYTE = 37;
-
   /**
    * @param msToWait number of milliseconds to wait for an ack. If 0 then wait 
forever.
    * @param msInterval interval between checks
@@ -3899,7 +3831,7 @@ public class Connection implements Runnable {
                     // } else {
                     // ConnectionTable.threadWantsSharedResources();
                   }
-                  this.owner.owner.stats.incThreadOwnedReceivers(1L, 
dominoNumber);
+                  this.conduit.stats.incThreadOwnedReceivers(1L, dominoNumber);
                   // Because this thread is not shared resource, it will be 
used for direct
                   // ack. Direct ack messages can be large. This call will 
resize the send
                   // buffer.
@@ -4039,6 +3971,10 @@ public class Connection implements Runnable {
     }
   }
 
+  protected TCPConduit getConduit() {
+    return this.conduit;
+  }
+
   protected Socket getSocket() throws SocketException {
     // fix for bug 37286
     Socket result = this.socket;
@@ -4177,7 +4113,7 @@ public class Connection implements Runnable {
   boolean nioChecked;
   boolean useNIO;
 
-  private final boolean useNIO() {
+  private boolean useNIO() {
     if (TCPConduit.useSSL) {
       return false;
     }
@@ -4193,7 +4129,7 @@ public class Connection implements Runnable {
     if (this.socket != null && (this.socket.getInetAddress() instanceof 
Inet6Address)) {
       String os = System.getProperty("os.name");
       if (os != null) {
-        if (os.indexOf("Windows") != -1) {
+        if (os.contains("Windows")) {
           this.useNIO = false;
         }
       }

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
index 08a9009..c55af82 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/ConnectionTable.java
@@ -64,17 +64,9 @@ import 
org.apache.geode.internal.logging.log4j.LocalizedMessage;
  * 
  * @since GemFire 2.1
  */
-/*
- * Note: We no longer use InputMultiplexer If InputMux is reinstated then the 
manager needs to be
- * initialized and all lines that have a NOMUX preface should be uncommented
- * 
- */
 public class ConnectionTable {
   private static final Logger logger = LogService.getLogger();
 
-  /** a random number generator for secondary connection selection */
-  // static java.util.Random random = new java.util.Random();
-
   /** warning when descriptor limit reached */
   private static boolean ulimitWarningIssued;
 
@@ -82,6 +74,7 @@ public class ConnectionTable {
    * true if the current thread wants non-shared resources
    */
   private static ThreadLocal threadWantsOwnResources = new ThreadLocal();
+
   /**
    * Used for messages whose order must be preserved Only connections used for 
sending messages, and
    * receiving acks, will be put in this map.
@@ -132,9 +125,7 @@ public class ConnectionTable {
   /**
    * the conduit for this table
    */
-  protected final TCPConduit owner;
-  // ARB: temp making this protected to provide access to Connection.
-  // private final TCPConduit owner;
+  private final TCPConduit owner;
 
   /**
    * true if this table is no longer in use
@@ -205,17 +196,10 @@ public class ConnectionTable {
     return (Boolean) threadWantsOwnResources.get();
   }
 
-  // public static void setThreadOwnsResourcesRegistration(
-  // Boolean newValue) {
-  // threadWantsOwnResources.set(newValue);
-  // }
-  // private Map connections = new HashMap();
-  /* NOMUX: private InputMuxManager inputMuxManager; */
-  // private int lowWater;
-  // private int highWater;
+  public TCPConduit getOwner() {
+    return owner;
+  }
 
-  // private static boolean TRACK_SERVER_CONNECTIONS =
-  // System.getProperty("p2p.bidirectional", "true").equals("true");
 
   private ConnectionTable(TCPConduit c) throws IOException {
     this.owner = c;
@@ -226,10 +210,6 @@ public class ConnectionTable {
     this.threadConnectionMap = new ConcurrentHashMap();
     this.p2pReaderThreadPool = 
createThreadPoolForIO(c.getDM().getSystem().isShareSockets());
     this.socketCloser = new SocketCloser();
-    /*
-     * NOMUX: if (TCPConduit.useNIO) { inputMuxManager = new 
InputMuxManager(this);
-     * inputMuxManager.start(c.logger); }
-     */
   }
 
   private Executor createThreadPoolForIO(boolean conserveSockets) {
@@ -306,7 +286,6 @@ public class ConnectionTable {
       }
     }
 
-    // Stub id = conn.getRemoteId();
     if (conn != null) {
       synchronized (this.receivers) {
         this.owner.stats.incReceivers();
@@ -322,22 +301,9 @@ public class ConnectionTable {
             conn.remoteAddr);
       }
     }
-    // cleanupHighWater();
   }
 
 
-  // /** returns the connection associated with the given key, or null if
-  // no such connection exists */
-  // protected Connection basicGet(Serializable id) {
-  // synchronized (this.orderedConnectionMap) {
-  // return (Connection) this.orderedConnectionMap.get(id);
-  // }
-  // }
-
-  // protected Connection get(Serializable id) throws java.io.IOException {
-  // return get(id, false);
-  // }
-
 
   /**
    * Process a newly created PendingConnection
@@ -432,10 +398,10 @@ public class ConnectionTable {
   }
 
   /**
-   * unordered or conserve-sockets note that unordered connections are 
currently always shared
+   * unordered or conserve-sockets=true note that unordered connections are 
currently always shared
    * 
    * @param id the DistributedMember on which we are creating a connection
-   * @param threadOwnsResources whether unordered conn is owned by the current 
thread
+   * @param scheduleTimeout whether unordered connection should time out
    * @param preserveOrder whether to preserve order
    * @param startTime the ms clock start time for the operation
    * @param ackTimeout the ms ack-wait-threshold, or zero
@@ -444,9 +410,9 @@ public class ConnectionTable {
    * @throws IOException if unable to create the connection
    * @throws DistributedSystemDisconnectedException
    */
-  private Connection getUnorderedOrConserveSockets(DistributedMember id,
-      boolean threadOwnsResources, boolean preserveOrder, long startTime, long 
ackTimeout,
-      long ackSATimeout) throws IOException, 
DistributedSystemDisconnectedException {
+  private Connection getSharedConnection(DistributedMember id, boolean 
scheduleTimeout,
+      boolean preserveOrder, long startTime, long ackTimeout, long 
ackSATimeout)
+      throws IOException, DistributedSystemDisconnectedException {
     Connection result = null;
 
     final Map m = preserveOrder ? this.orderedConnectionMap : 
this.unorderedConnectionMap;
@@ -472,7 +438,7 @@ public class ConnectionTable {
     if (pc != null) {
       result = handleNewPendingConnection(id, true /* fixes bug 43386 */, 
preserveOrder, m, pc,
           startTime, ackTimeout, ackSATimeout);
-      if (!preserveOrder && threadOwnsResources) {
+      if (!preserveOrder && scheduleTimeout) {
         scheduleIdleTimeout(result);
       }
     } else { // we have existing connection
@@ -487,10 +453,10 @@ public class ConnectionTable {
             startTime, ackTimeout, ackSATimeout);
         if (logger.isDebugEnabled()) {
           if (result != null) {
-            logger.debug("getUnorderedOrConserveSockets {} myAddr={} 
theirAddr={}", result,
+            logger.debug("getSharedConnection {} myAddr={} theirAddr={}", 
result,
                 getConduit().getMemberId(), result.remoteAddr);
           } else {
-            logger.debug("getUnorderedOrConserveSockets: Connect failed");
+            logger.debug("getSharedConnection: Connect failed");
           }
         }
       } else {
@@ -512,7 +478,7 @@ public class ConnectionTable {
    * @throws IOException if the connection could not be created
    * @throws DistributedSystemDisconnectedException
    */
-  Connection getOrderedAndOwned(DistributedMember id, long startTime, long 
ackTimeout,
+  Connection getThreadOwnedConnection(DistributedMember id, long startTime, 
long ackTimeout,
       long ackSATimeout) throws IOException, 
DistributedSystemDisconnectedException {
     Connection result = null;
 
@@ -658,10 +624,10 @@ public class ConnectionTable {
     Connection result = null;
     boolean threadOwnsResources = threadOwnsResources();
     if (!preserveOrder || !threadOwnsResources) {
-      result = getUnorderedOrConserveSockets(id, threadOwnsResources, 
preserveOrder, startTime,
-          ackTimeout, ackSATimeout);
+      result = getSharedConnection(id, threadOwnsResources, preserveOrder, 
startTime, ackTimeout,
+          ackSATimeout);
     } else {
-      result = getOrderedAndOwned(id, startTime, ackTimeout, ackSATimeout);
+      result = getThreadOwnedConnection(id, startTime, ackTimeout, 
ackSATimeout);
     }
     if (result != null) {
       Assert.assertTrue(result.preserveOrder == preserveOrder);

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
index 3872ee9..bf06953 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/DirectReplySender.java
@@ -56,7 +56,7 @@ class DirectReplySender implements ReplySender {
     // mutates the list when it has exceptions.
 
     // fix for bug #42199 - cancellation check
-    this.conn.owner.getDM().getCancelCriterion().checkCancelInProgress(null);
+    
this.conn.getConduit().getDM().getCancelCriterion().checkCancelInProgress(null);
 
     if (logger.isTraceEnabled(LogMarker.DM)) {
       logger.trace(LogMarker.DM, "Sending a direct reply {} to {}", msg, 
conn.getRemoteAddress());

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
index fc56271..be1f533 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgReader.java
@@ -94,7 +94,7 @@ public abstract class MsgReader {
   public abstract ByteBuffer readAtLeast(int bytes) throws IOException;
 
   protected DMStats getStats() {
-    return conn.owner.getConduit().stats;
+    return conn.getConduit().stats;
   }
 
   public static class Header {

http://git-wip-us.apache.org/repos/asf/geode/blob/6b2b7b2f/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
index 50f5fae..a4e35a4 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/NIOMsgReader.java
@@ -79,7 +79,7 @@ public class NIOMsgReader extends MsgReader {
     if (nioInputBuffer == null) {
       int allocSize = conn.getReceiveBufferSize();
       if (allocSize == -1) {
-        allocSize = conn.owner.getConduit().tcpBufferSize;
+        allocSize = conn.getConduit().tcpBufferSize;
       }
       if (allocSize > bufferSize) {
         bufferSize = allocSize;

Reply via email to