Cleanup Message class

Project: http://git-wip-us.apache.org/repos/asf/geode/repo
Commit: http://git-wip-us.apache.org/repos/asf/geode/commit/b4bf200d
Tree: http://git-wip-us.apache.org/repos/asf/geode/tree/b4bf200d
Diff: http://git-wip-us.apache.org/repos/asf/geode/diff/b4bf200d

Branch: refs/heads/feature/GEODE-2632-17
Commit: b4bf200dc6132afd523637045dc2899a334690b8
Parents: 006b135
Author: Kirk Lund <kl...@apache.org>
Authored: Mon May 22 13:47:55 2017 -0700
Committer: Kirk Lund <kl...@apache.org>
Committed: Wed May 24 16:41:07 2017 -0700

----------------------------------------------------------------------
 .../java/org/apache/geode/Instantiator.java     | 112 ++--
 .../geode/cache/client/internal/AbstractOp.java |   2 +-
 .../geode/cache/client/internal/PingOp.java     |  10 +-
 .../cache/tier/sockets/CacheClientUpdater.java  |  17 +-
 .../cache/tier/sockets/ChunkedMessage.java      |  19 +-
 .../internal/cache/tier/sockets/Message.java    | 591 ++++++++++---------
 .../cache/tier/sockets/ServerConnection.java    |  65 +-
 .../apache/geode/internal/tcp/Connection.java   |   2 +-
 .../org/apache/geode/internal/util/IOUtils.java |   6 +-
 .../cache/tier/sockets/MessageJUnitTest.java    |  64 +-
 .../internal/JUnit4DistributedTestCase.java     |   2 +-
 ...arallelGatewaySenderOperationsDUnitTest.java |  16 +-
 12 files changed, 448 insertions(+), 458 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/Instantiator.java
----------------------------------------------------------------------
diff --git a/geode-core/src/main/java/org/apache/geode/Instantiator.java 
b/geode-core/src/main/java/org/apache/geode/Instantiator.java
index 3c1ca06..c727e5b 100644
--- a/geode-core/src/main/java/org/apache/geode/Instantiator.java
+++ b/geode-core/src/main/java/org/apache/geode/Instantiator.java
@@ -20,15 +20,15 @@ import 
org.apache.geode.internal.cache.tier.sockets.ClientProxyMembershipID;
 import org.apache.geode.internal.i18n.LocalizedStrings;
 
 /**
- * <code>Instantiator</code> allows classes that implement {@link 
DataSerializable} to be registered
- * with the data serialization framework. Knowledge of 
<code>DataSerializable</code> classes allows
+ * {@code Instantiator} allows classes that implement {@link DataSerializable} 
to be registered
+ * with the data serialization framework. Knowledge of {@code 
DataSerializable} classes allows
  * the framework to optimize how instances of those classes are data 
serialized.
  *
  * <P>
  *
- * Ordinarily, when a <code>DataSerializable</code> object is written using
+ * Ordinarily, when a {@code DataSerializable} object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, a special 
marker class id is
- * written to the stream followed by the class name of the 
<code>DataSerializable</code> object.
+ * written to the stream followed by the class name of the {@code 
DataSerializable} object.
  * After the marker class id is read by {@link DataSerializer#readObject} it 
performs the following
  * operations,
  *
@@ -44,23 +44,20 @@ import org.apache.geode.internal.i18n.LocalizedStrings;
  *
  * </OL>
  *
- * However, if a <code>DataSerializable</code> class is {@linkplain 
#register(Instantiator)
+ * However, if a {@code DataSerializable} class is {@linkplain 
#register(Instantiator)
  * registered} with the data serialization framework and assigned a unique 
class id, an important
  * optimization can be performed that avoid the expense of using reflection to 
instantiate the
- * <code>DataSerializable</code> class. When the object is written using
+ * {@code DataSerializable} class. When the object is written using
  * {@link DataSerializer#writeObject(Object, java.io.DataOutput)}, the 
object's registered class id
  * is written to the stream. Consequently, when the data is read from the 
stream, the
- * {@link #newInstance} method of the appropriate <code>Instantiator</code> 
instance is invoked to
- * create an "empty" instance of the <code>DataSerializable</code> instead of 
using reflection to
+ * {@link #newInstance} method of the appropriate {@code Instantiator} 
instance is invoked to
+ * create an "empty" instance of the {@code DataSerializable} instead of using 
reflection to
  * create the new instance.
  *
  * <P>
  *
- * Commonly, a <code>DataSerializable</code> class will register itself with 
the
- * <code>Instantiator</code> in a static initializer as shown in the below 
example code.
- *
- * <!-- The source code for the CompanySerializer class resides in 
tests/com/examples/ds/User.java
- * Please keep the below code snippet in sync with that file. -->
+ * Commonly, a {@code DataSerializable} class will register itself with the
+ * {@code Instantiator} in a static initializer as shown in the below example 
code.
  *
  * <PRE>
 public class User implements DataSerializable {
@@ -101,22 +98,22 @@ public class User implements DataSerializable {
 }
  * </PRE>
  *
- * <code>Instantiator</code>s may be distributed to other members of the 
distributed system when
+ * {@code Instantiator}s may be distributed to other members of the 
distributed system when
  * they are registered. Consider the following scenario in which VM1 and VM2 
are members of the same
  * distributed system. Both VMs define the sameRegion and VM2's region 
replicates the contents of
- * VM1's using replication. VM1 puts an instance of the above 
<code>User</code> class into the
- * region. The act of instantiating <code>User</code> will load the 
<code>User</code> class and
- * invoke its static initializer, thus registering the 
<code>Instantiator</code> with the data
- * serialization framework. Because the region is a replicate, the 
<code>User</code> will be data
- * serialized and sent to VM2. However, when VM2 attempts to data deserialize 
the <code>User</code>,
- * its <code>Instantiator</code> will not necessarily be registered because 
<code>User</code>'s
+ * VM1's using replication. VM1 puts an instance of the above {@code User} 
class into the
+ * region. The act of instantiating {@code User} will load the {@code User} 
class and
+ * invoke its static initializer, thus registering the {@code Instantiator} 
with the data
+ * serialization framework. Because the region is a replicate, the {@code 
User} will be data
+ * serialized and sent to VM2. However, when VM2 attempts to data deserialize 
the {@code User},
+ * its {@code Instantiator} will not necessarily be registered because {@code 
User}'s
  * static initializer may not have been invoked yet. As a result, an exception 
would be logged while
- * deserializing the <code>User</code> and the replicate would not appear to 
have the new value. So,
- * in order to ensure that the <code>Instantiator</code> is registered in VM2, 
the data
- * serialization framework distributes a message to each member when an 
<code>Instantiator</code> is
+ * deserializing the {@code User} and the replicate would not appear to have 
the new value. So,
+ * in order to ensure that the {@code Instantiator} is registered in VM2, the 
data
+ * serialization framework distributes a message to each member when an {@code 
Instantiator} is
  * {@linkplain #register(Instantiator) registered}.
  * <p>
- * Note that the framework does not require that an <code>Instantiator</code> 
be
+ * Note that the framework does not require that an {@code Instantiator} be
  * {@link java.io.Serializable}, but it does require that it provide a
  * {@linkplain #Instantiator(Class, int) two-argument constructor}.
  *
@@ -133,63 +130,64 @@ public abstract class Instantiator {
    */
   private Class<? extends DataSerializable> clazz;
 
-  /** The id of this <code>Instantiator</code> */
+  /** The id of this {@code Instantiator} */
   private int id;
 
-  /** The eventId of this <code>Instantiator</code> */
+  /** The eventId of this {@code Instantiator} */
   private EventID eventId;
 
-  /** The originator of this <code>Instantiator</code> */
+  /** The originator of this {@code Instantiator} */
   private ClientProxyMembershipID context;
 
   /**
-   * Registers a <code>DataSerializable</code> class with the data 
serialization framework. This
+   * Registers a {@code DataSerializable} class with the data serialization 
framework. This
    * method is usually invoked from the static initializer of a class that 
implements
-   * <code>DataSerializable</code>.
+   * {@code DataSerializable}.
    *
-   * @param instantiator An <code>Instantiator</code> whose {@link 
#newInstance} method is invoked
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} 
method is invoked
    *        when an object is data deserialized.
    *
-   * @throws IllegalStateException If class <code>c</code> is already 
registered with a different
-   *         class id, or another class has already been registered with id 
<code>classId</code>
-   * @throws NullPointerException If <code>instantiator</code> is 
<code>null</code>.
+   * @throws IllegalStateException If class {@code c} is already registered 
with a different
+   *         class id, or another class has already been registered with id 
{@code classId}
+   * @throws NullPointerException If {@code instantiator} is {@code null}.
    */
   public static synchronized void register(Instantiator instantiator) {
     InternalInstantiator.register(instantiator, true);
   }
 
   /**
-   * Registers a <code>DataSerializable</code> class with the data 
serialization framework. This
+   * Registers a {@code DataSerializable} class with the data serialization 
framework. This
    * method is usually invoked from the static initializer of a class that 
implements
-   * <code>DataSerializable</code>.
+   * {@code DataSerializable}.
    *
-   * @param instantiator An <code>Instantiator</code> whose {@link 
#newInstance} method is invoked
+   * @param instantiator An {@code Instantiator} whose {@link #newInstance} 
method is invoked
    *        when an object is data deserialized.
    *
-   * @param distribute True if the registered <code>Instantiator</code> has to 
be distributed to
+   * @param distribute True if the registered {@code Instantiator} has to be 
distributed to
    *        other members of the distributed system. Note that if distribute 
is set to false it may
    *        still be distributed in some cases.
    *
-   * @throws IllegalArgumentException If class <code>c</code> is already 
registered with a different
-   *         class id, or another class has already been registered with id 
<code>classId</code>
-   * @throws NullPointerException If <code>instantiator</code> is 
<code>null</code>.
+   * @throws IllegalArgumentException If class {@code c} is already registered 
with a different
+   *         class id, or another class has already been registered with id 
{@code classId}
+   * @throws NullPointerException If {@code instantiator} is {@code null}.
    * @deprecated as of 9.0 use {@link Instantiator#register(Instantiator)} 
instead
    */
+  @Deprecated
   public static synchronized void register(Instantiator instantiator, boolean 
distribute) {
     InternalInstantiator.register(instantiator, distribute);
   }
 
   /**
-   * Creates a new <code>Instantiator</code> that instantiates a given class.
+   * Creates a new {@code Instantiator} that instantiates a given class.
    *
-   * @param c The <code>DataSerializable</code> class to register. This class 
must have a static
-   *        initializer that registers this <code>Instantiator</code>.
-   * @param classId A unique id for class <code>c</code>. The 
<code>classId</code> must not be zero.
-   *        This has been an <code>int</code> since dsPhase1.
+   * @param c The {@code DataSerializable} class to register. This class must 
have a static
+   *        initializer that registers this {@code Instantiator}.
+   * @param classId A unique id for class {@code c}. The {@code classId} must 
not be zero.
+   *        This has been an {@code int} since dsPhase1.
    *
-   * @throws IllegalArgumentException If <code>c</code> does not implement
-   *         <code>DataSerializable</code>, <code>classId</code> is less than 
or equal to zero.
-   * @throws NullPointerException If <code>c</code> is <code>null</code>
+   * @throws IllegalArgumentException If {@code c} does not implement
+   *         {@code DataSerializable}, {@code classId} is less than or equal 
to zero.
+   * @throws NullPointerException If {@code c} is {@code null}
    */
   public Instantiator(Class<? extends DataSerializable> c, int classId) {
     if (c == null) {
@@ -205,7 +203,7 @@ public abstract class Instantiator {
 
     if (classId == 0) {
       throw new 
IllegalArgumentException(LocalizedStrings.Instantiator_CLASS_ID_0_MUST_NOT_BE_0
-          .toLocalizedString(Integer.valueOf(classId)));
+          .toLocalizedString(classId));
     }
 
     this.clazz = c;
@@ -213,7 +211,7 @@ public abstract class Instantiator {
   }
 
   /**
-   * Creates a new "empty" instance of a <Code>DataSerializable</code> class 
whose state will be
+   * Creates a new "empty" instance of a {@code DataSerializable} class whose 
state will be
    * filled in by invoking its {@link DataSerializable#fromData fromData} 
method.
    *
    * @see DataSerializer#readObject
@@ -221,29 +219,29 @@ public abstract class Instantiator {
   public abstract DataSerializable newInstance();
 
   /**
-   * Returns the <code>DataSerializable</code> class that is instantiated by 
this
-   * <code>Instantiator</code>.
+   * Returns the {@code DataSerializable} class that is instantiated by this
+   * {@code Instantiator}.
    */
   public Class<? extends DataSerializable> getInstantiatedClass() {
     return this.clazz;
   }
 
   /**
-   * Returns the unique <code>id</code> of this <code>Instantiator</code>.
+   * Returns the unique {@code id} of this {@code Instantiator}.
    */
   public int getId() {
     return this.id;
   }
 
   /**
-   * sets the unique <code>eventId</code> of this <code>Instantiator</code>. 
For internal use only.
+   * sets the unique {@code eventId} of this {@code Instantiator}. For 
internal use only.
    */
   public void setEventId(Object/* EventID */ eventId) {
     this.eventId = (EventID) eventId;
   }
 
   /**
-   * Returns the unique <code>eventId</code> of this 
<code>Instantiator</code>. For internal use
+   * Returns the unique {@code eventId} of this {@code Instantiator}. For 
internal use
    * only.
    */
   public Object/* EventID */ getEventId() {
@@ -251,14 +249,14 @@ public abstract class Instantiator {
   }
 
   /**
-   * sets the context of this <code>Instantiator</code>. For internal use only.
+   * sets the context of this {@code Instantiator}. For internal use only.
    */
   public void setContext(Object/* ClientProxyMembershipID */ context) {
     this.context = (ClientProxyMembershipID) context;
   }
 
   /**
-   * Returns the context of this <code>Instantiator</code>. For internal use 
only.
+   * Returns the context of this {@code Instantiator}. For internal use only.
    */
   public Object/* ClientProxyMembershipID */ getContext() {
     return this.context;

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
index a0cb7d4..7af4f4f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/AbstractOp.java
@@ -228,7 +228,7 @@ public abstract class AbstractOp implements Op {
   protected abstract Object processResponse(Message msg) throws Exception;
 
   /**
-   * Return true of <code>msgType</code> indicates the operation had an error 
on the server.
+   * Return true of <code>messageType</code> indicates the operation had an 
error on the server.
    */
   protected abstract boolean isErrorResponse(int msgType);
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
index cc30f1c..2e52542 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/PingOp.java
@@ -14,7 +14,6 @@
  */
 package org.apache.geode.cache.client.internal;
 
-import org.apache.geode.distributed.internal.InternalDistributedSystem;
 import org.apache.geode.distributed.internal.ServerLocation;
 import org.apache.geode.internal.cache.tier.MessageType;
 import org.apache.geode.internal.cache.tier.sockets.Message;
@@ -25,6 +24,7 @@ import org.apache.geode.internal.cache.tier.sockets.Message;
  * @since GemFire 5.7
  */
 public class PingOp {
+
   /**
    * Ping the specified server to see if it is still alive
    * 
@@ -47,13 +47,13 @@ public class PingOp {
     /**
      * @throws org.apache.geode.SerializationException if serialization fails
      */
-    public PingOpImpl() {
+    PingOpImpl() {
       super(MessageType.PING, 0);
     }
 
     @Override
     protected void processSecureBytes(Connection cnx, Message message) throws 
Exception {
-      Message.messageType.set(null);
+      Message.MESSAGE_TYPE.set(null);
     }
 
     @Override
@@ -64,9 +64,9 @@ public class PingOp {
     @Override
     protected void sendMessage(Connection cnx) throws Exception {
       getMessage().clearMessageHasSecurePartFlag();
-      startTime = System.currentTimeMillis();
+      this.startTime = System.currentTimeMillis();
       getMessage().send(false);
-      Message.messageType.set(MessageType.PING);
+      Message.MESSAGE_TYPE.set(MessageType.PING);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
index 291db65..7698550 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/CacheClientUpdater.java
@@ -572,20 +572,9 @@ public class CacheClientUpdater extends Thread implements 
ClientUpdater, Disconn
    * the server.
    */
   private Message initializeMessage() {
-    Message _message = new Message(2, Version.CURRENT);
-    try {
-      _message.setComms(socket, in, out, commBuffer, this.stats);
-    } catch (IOException e) {
-      if (!quitting()) {
-        if (logger.isDebugEnabled()) {
-          logger.debug(
-              "{}: Caught following exception while attempting to initialize a 
server-to-client communication socket and will exit",
-              this, e);
-        }
-        stopProcessing();
-      }
-    }
-    return _message;
+    Message message = new Message(2, Version.CURRENT);
+    message.setComms(this.socket, this.in, this.out, this.commBuffer, 
this.stats);
+    return message;
   }
 
   /* refinement of method inherited from Thread */

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
index 2a5a3d7..be30061 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ChunkedMessage.java
@@ -22,7 +22,6 @@ import org.apache.geode.internal.logging.LogService;
 
 import java.io.EOFException;
 import java.io.IOException;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 
 import org.apache.logging.log4j.Logger;
@@ -36,7 +35,7 @@ import org.apache.logging.log4j.Logger;
  * 
  * <PRE>
  * 
- * msgType - int - 4 bytes type of message, types enumerated below
+ * messageType - int - 4 bytes type of message, types enumerated below
  * 
  * numberOfParts - int - 4 bytes number of elements (LEN-BYTE* pairs) contained
  * in the payload. Message can be a multi-part message
@@ -153,7 +152,7 @@ public class ChunkedMessage extends Message {
 
   public void setLastChunkAndNumParts(boolean lastChunk, int numParts) {
     setLastChunk(lastChunk);
-    if (this.sc != null && 
this.sc.getClientVersion().compareTo(Version.GFE_65) >= 0) {
+    if (this.serverConnection != null && 
this.serverConnection.getClientVersion().compareTo(Version.GFE_65) >= 0) {
       // we us e three bits for number of parts in last chunk byte
       // we us e three bits for number of parts in last chunk byte
       byte localLastChunk = (byte) (numParts << 5);
@@ -162,7 +161,7 @@ public class ChunkedMessage extends Message {
   }
 
   public void setServerConnection(ServerConnection servConn) {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
   }
 
@@ -209,7 +208,7 @@ public class ChunkedMessage extends Message {
         // Set the header and payload fields only after receiving all the
         // socket data, providing better message consistency in the face
         // of exceptional conditions (e.g. IO problems, timeouts etc.)
-        this.msgType = type;
+        this.messageType = type;
         this.numberOfParts = numParts; // Already set in setPayloadFields via 
setNumberOfParts
         this.transactionId = txid;
       }
@@ -241,14 +240,14 @@ public class ChunkedMessage extends Message {
     int totalBytesRead = 0;
     do {
       int bytesRead = 0;
-      bytesRead = is.read(cb.array(), totalBytesRead, CHUNK_HEADER_LENGTH - 
totalBytesRead);
+      bytesRead = inputStream.read(cb.array(), totalBytesRead, 
CHUNK_HEADER_LENGTH - totalBytesRead);
       if (bytesRead == -1) {
         throw new EOFException(
             
LocalizedStrings.ChunkedMessage_CHUNK_READ_ERROR_CONNECTION_RESET.toLocalizedString());
       }
       totalBytesRead += bytesRead;
-      if (this.msgStats != null) {
-        this.msgStats.incReceivedBytes(bytesRead);
+      if (this.messageStats != null) {
+        this.messageStats.incReceivedBytes(bytesRead);
       }
     } while (totalBytesRead < CHUNK_HEADER_LENGTH);
 
@@ -315,7 +314,7 @@ public class ChunkedMessage extends Message {
    * Sends a chunk of this message.
    */
   public void sendChunk(ServerConnection servConn) throws IOException {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
     sendChunk();
   }
@@ -355,7 +354,7 @@ public class ChunkedMessage extends Message {
   protected void getHeaderBytesForWrite() {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    cb.putInt(this.msgType);
+    cb.putInt(this.messageType);
     cb.putInt(this.numberOfParts);
 
     cb.putInt(this.transactionId);

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
index f102b2d..354ad0f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/Message.java
@@ -14,6 +14,8 @@
  */
 package org.apache.geode.internal.cache.tier.sockets;
 
+import static org.apache.geode.internal.util.IOUtils.close;
+
 import org.apache.geode.SerializationException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Assert;
@@ -34,7 +36,6 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.Socket;
-import java.net.SocketTimeoutException;
 import java.nio.ByteBuffer;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
@@ -47,7 +48,7 @@ import java.util.concurrent.TimeUnit;
  * and serialize it out to the wire.
  *
  * <PRE>
- * msgType       - int   - 4 bytes type of message, types enumerated below
+ * messageType       - int   - 4 bytes type of message, types enumerated below
  *
  * msgLength     - int - 4 bytes   total length of variable length payload
  *
@@ -55,10 +56,10 @@ import java.util.concurrent.TimeUnit;
  *                     contained in the payload. Message can
  *                       be a multi-part message
  *
- * transId       - int - 4 bytes  filled in by the requestor, copied back into
+ * transId       - int - 4 bytes  filled in by the requester, copied back into
  *                    the response
  *
- * flags         - byte- 1 byte   filled in by the requestor
+ * flags         - byte- 1 byte   filled in by the requester
  * len1
  * part1
  * .
@@ -76,18 +77,16 @@ import java.util.concurrent.TimeUnit;
  *
  * See also <a href="package-summary.html#messages">package description</a>.
  *
- * @see org.apache.geode.internal.cache.tier.MessageType
- *
+ * @see MessageType
  */
 public class Message {
 
-  public static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
-  /**
-   * maximum size of an outgoing message. See GEODE-478
-   */
-  public static int MAX_MESSAGE_SIZE =
-      Integer.getInteger(DistributionConfig.GEMFIRE_PREFIX + 
"client.max-message-size",
-          DEFAULT_MAX_MESSAGE_SIZE).intValue();
+  // Tentative workaround to avoid OOM stated in #46754.
+  public static final ThreadLocal<Integer> MESSAGE_TYPE = new ThreadLocal<>();
+
+  public static final String MAX_MESSAGE_SIZE_PROPERTY = 
DistributionConfig.GEMFIRE_PREFIX + "client.max-message-size";
+
+  static final int DEFAULT_MAX_MESSAGE_SIZE = 1073741824;
 
   private static final Logger logger = LogService.getLogger();
 
@@ -97,83 +96,95 @@ public class Message {
 
   private static final ThreadLocal<ByteBuffer> tlCommBuffer = new 
ThreadLocal<>();
 
-  private static final byte[] TRUE;
-  private static final byte[] FALSE;
+  // These two statics are fields shoved into the flags byte for transmission.
+  // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the 
other
+  // is left in place
+  private static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
+  private static final byte MESSAGE_IS_RETRY = (byte) 0x04;
 
-  static {
+  private static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
+
+  private static final int DEFAULT_CHUNK_SIZE = 1024;
+
+  private static final byte[] TRUE = defineTrue();
+  private static final byte[] FALSE = defineFalse();
+
+  private static byte[] defineTrue() {
+    HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
     try {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
       BlobHelper.serializeTo(Boolean.TRUE, hdos);
-      TRUE = hdos.toByteArray();
-    } catch (Exception e) {
+      return hdos.toByteArray();
+    } catch (IOException e) {
       throw new IllegalStateException(e);
+    } finally {
+      close(hdos);
     }
+  }
 
+  private static byte[] defineFalse() {
+    HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
     try {
-      HeapDataOutputStream hdos = new HeapDataOutputStream(10, null);
       BlobHelper.serializeTo(Boolean.FALSE, hdos);
-      FALSE = hdos.toByteArray();
-    } catch (Exception e) {
+      return hdos.toByteArray();
+    } catch (IOException e) {
       throw new IllegalStateException(e);
+    } finally {
+      close(hdos);
     }
   }
 
-  protected int msgType;
-  protected int payloadLength = 0;
-  protected int numberOfParts = 0;
+  /**
+   * maximum size of an outgoing message. See GEODE-478
+   */
+  private final int maxMessageSize;
+
+  protected int messageType;
+  private int payloadLength = 0;
+  int numberOfParts = 0;
   protected int transactionId = TXManagerImpl.NOTX;
-  protected int currentPart = 0;
-  protected Part[] partsList = null;
-  protected ByteBuffer cachedCommBuffer;
+  int currentPart = 0;
+  private Part[] partsList = null;
+  private ByteBuffer cachedCommBuffer;
   protected Socket socket = null;
-  protected SocketChannel sockCh = null;
-  protected OutputStream os = null;
-  protected InputStream is = null;
-  protected boolean messageModified = true;
+  private SocketChannel socketChannel = null;
+  private OutputStream outputStream = null;
+  protected InputStream inputStream = null;
+  private boolean messageModified = true;
+
   /** is this message a retry of a previously sent message? */
-  protected boolean isRetry;
+  private boolean isRetry;
+
   private byte flags = 0x00;
-  protected MessageStats msgStats = null;
-  protected ServerConnection sc = null;
+  MessageStats messageStats = null;
+  protected ServerConnection serverConnection = null;
   private int maxIncomingMessageLength = -1;
   private Semaphore dataLimiter = null;
-  // private int MAX_MSGS = -1;
-  private Semaphore msgLimiter = null;
-  private boolean hdrRead = false;
-  private int chunkSize = 1024;// Default Chunk Size.
+  private Semaphore messageLimiter = null;
+  private boolean readHeader = false;
+  private int chunkSize = DEFAULT_CHUNK_SIZE;
 
-  protected Part securePart = null;
+  Part securePart = null;
   private boolean isMetaRegion = false;
 
-
-  // These two statics are fields shoved into the flags byte for transmission.
-  // The MESSAGE_IS_RETRY bit is stripped out during deserialization but the 
other
-  // is left in place
-  public static final byte MESSAGE_HAS_SECURE_PART = (byte) 0x02;
-  public static final byte MESSAGE_IS_RETRY = (byte) 0x04;
-
-  public static final byte MESSAGE_IS_RETRY_MASK = (byte) 0xFB;
-
-  // Tentative workaround to avoid OOM stated in #46754.
-  public static final ThreadLocal<Integer> messageType = new 
ThreadLocal<Integer>();
-
-  Version version;
+  private Version version;
 
   /**
    * Creates a new message with the given number of parts
    */
   public Message(int numberOfParts, Version destVersion) {
+    this.maxMessageSize = Integer.getInteger(MAX_MESSAGE_SIZE_PROPERTY, 
DEFAULT_MAX_MESSAGE_SIZE);
     this.version = destVersion;
     Assert.assertTrue(destVersion != null, "Attempt to create an unversioned 
message");
-    partsList = new Part[numberOfParts];
+    this.partsList = new Part[numberOfParts];
     this.numberOfParts = numberOfParts;
-    for (int i = 0; i < partsList.length; i++) {
-      partsList[i] = new Part();
+    int partsListLength = this.partsList.length;
+    for (int i = 0; i < partsListLength; i++) {
+      this.partsList[i] = new Part();
     }
   }
 
   public boolean isSecureMode() {
-    return securePart != null;
+    return this.securePart != null;
   }
 
   public byte[] getSecureBytes() throws IOException, ClassNotFoundException {
@@ -186,7 +197,7 @@ public class Message {
       throw new IllegalArgumentException(
           LocalizedStrings.Message_INVALID_MESSAGETYPE.toLocalizedString());
     }
-    this.msgType = msgType;
+    this.messageType = msgType;
   }
 
   public void setVersion(Version clientVersion) {
@@ -194,17 +205,15 @@ public class Message {
   }
 
   public void setMessageHasSecurePartFlag() {
-    this.flags = (byte) (this.flags | MESSAGE_HAS_SECURE_PART);
+    this.flags |= MESSAGE_HAS_SECURE_PART;
   }
 
   public void clearMessageHasSecurePartFlag() {
-    this.flags = (byte) (this.flags & MESSAGE_HAS_SECURE_PART);
+    this.flags &= MESSAGE_HAS_SECURE_PART;
   }
 
   /**
    * Sets and builds the {@link Part}s that are sent in the payload of the 
Message
-   * 
-   * @param numberOfParts
    */
   public void setNumberOfParts(int numberOfParts) {
     // hitesh: need to add security header here from server
@@ -227,9 +236,7 @@ public class Message {
   }
 
   /**
-   * For boundary testing we may need to inject mock parts
-   * 
-   * @param parts
+   * For boundary testing we may need to inject mock parts. For testing only.
    */
   void setParts(Part[] parts) {
     this.partsList = parts;
@@ -260,7 +267,7 @@ public class Message {
   /**
    * When building a Message this will return the number of the next Part to 
be added to the message
    */
-  public int getNextPartNumber() {
+  int getNextPartNumber() {
     return this.currentPart;
   }
 
@@ -268,32 +275,41 @@ public class Message {
     addStringPart(str, false);
   }
 
-  private static final Map<String, byte[]> CACHED_STRINGS = new 
ConcurrentHashMap<String, byte[]>();
+  private static final Map<String, byte[]> CACHED_STRINGS = new 
ConcurrentHashMap<>();
 
   public void addStringPart(String str, boolean enableCaching) {
     if (str == null) {
-      addRawPart((byte[]) null, false);
-    } else {
-      Part part = partsList[this.currentPart];
-      if (enableCaching) {
-        byte[] bytes = CACHED_STRINGS.get(str);
-        if (bytes == null) {
-          HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+      addRawPart(null, false);
+      return;
+    }
+
+    Part part = this.partsList[this.currentPart];
+    if (enableCaching) {
+      byte[] bytes = CACHED_STRINGS.get(str);
+      if (bytes == null) {
+        HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+        try {
           bytes = hdos.toByteArray();
           CACHED_STRINGS.put(str, bytes);
+        } finally {
+          close(hdos);
         }
-        part.setPartState(bytes, false);
-      } else {
-        HeapDataOutputStream hdos = new HeapDataOutputStream(str);
-        this.messageModified = true;
-        part.setPartState(hdos, false);
       }
-      this.currentPart++;
+      part.setPartState(bytes, false);
+    } else {
+      HeapDataOutputStream hdos = new HeapDataOutputStream(str);
+      try {
+      this.messageModified = true;
+      part.setPartState(hdos, false);
+      } finally {
+        close(hdos);
+      }
     }
+    this.currentPart++;
   }
 
   /*
-   * Adds a new part to this message that contains a <code>byte</code> array 
(as opposed to a
+   * Adds a new part to this message that contains a {@code byte} array (as 
opposed to a
    * serialized object).
    *
    * @see #addPart(byte[], boolean)
@@ -312,13 +328,6 @@ public class Message {
     }
   }
 
-  public void addDeltaPart(HeapDataOutputStream hdos) {
-    this.messageModified = true;
-    Part part = partsList[this.currentPart];
-    part.setPartState(hdos, false);
-    this.currentPart++;
-  }
-
   public void addObjPart(Object o) {
     addObjPart(o, false);
   }
@@ -345,6 +354,9 @@ public class Message {
     }
   }
 
+  /**
+   * Object o is always null
+   */
   public void addPartInAnyForm(@Unretained Object o, boolean isObject) {
     if (o == null) {
       addRawPart((byte[]) o, false);
@@ -353,7 +365,7 @@ public class Message {
     } else if (o instanceof StoredObject) {
       // It is possible it is an off-heap StoredObject that contains a simple 
non-object byte[].
       this.messageModified = true;
-      Part part = partsList[this.currentPart];
+      Part part = this.partsList[this.currentPart];
       part.setPartState((StoredObject) o, isObject);
       this.currentPart++;
     } else {
@@ -362,59 +374,61 @@ public class Message {
   }
 
   private void serializeAndAddPartNoCopying(Object o) {
-    HeapDataOutputStream hdos;
-    Version v = version;
-    if (version.equals(Version.CURRENT)) {
+    Version v = this.version;
+    if (this.version.equals(Version.CURRENT)) {
       v = null;
     }
+    
     // create the HDOS with a flag telling it that it can keep any byte[] or 
ByteBuffers/ByteSources
     // passed to it.
-    hdos = new HeapDataOutputStream(chunkSize, v, true);
+    HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v, 
true);
     try {
       BlobHelper.serializeTo(o, hdos);
+      this.messageModified = true;
+      Part part = this.partsList[this.currentPart];
+      part.setPartState(hdos, true);
+      this.currentPart++;
     } catch (IOException ex) {
       throw new SerializationException("failed serializing object", ex);
+    } finally {
+      close(hdos);
     }
-    this.messageModified = true;
-    Part part = partsList[this.currentPart];
-    part.setPartState(hdos, true);
-    this.currentPart++;
-
   }
 
   private void serializeAndAddPart(Object o, boolean zipValues) {
     if (zipValues) {
       throw new UnsupportedOperationException("zipValues no longer supported");
-
-    } else {
-      HeapDataOutputStream hdos;
-      Version v = version;
-      if (version.equals(Version.CURRENT)) {
-        v = null;
-      }
-      hdos = new HeapDataOutputStream(chunkSize, v);
-      try {
-        BlobHelper.serializeTo(o, hdos);
-      } catch (IOException ex) {
-        throw new SerializationException("failed serializing object", ex);
-      }
+    }
+    
+    Version v = this.version;
+    if (this.version.equals(Version.CURRENT)) {
+      v = null;
+    }
+    
+    HeapDataOutputStream hdos = new HeapDataOutputStream(this.chunkSize, v);
+    try {
+      BlobHelper.serializeTo(o, hdos);
       this.messageModified = true;
-      Part part = partsList[this.currentPart];
+      Part part = this.partsList[this.currentPart];
       part.setPartState(hdos, true);
       this.currentPart++;
+    } catch (IOException ex) {
+      throw new SerializationException("failed serializing object", ex);
+    } finally {
+      close(hdos);
     }
   }
 
   public void addIntPart(int v) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setInt(v);
     this.currentPart++;
   }
 
   public void addLongPart(long v) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setLong(v);
     this.currentPart++;
   }
@@ -424,13 +438,13 @@ public class Message {
    */
   public void addRawPart(byte[] newPart, boolean isObject) {
     this.messageModified = true;
-    Part part = partsList[this.currentPart];
+    Part part = this.partsList[this.currentPart];
     part.setPartState(newPart, isObject);
     this.currentPart++;
   }
 
   public int getMessageType() {
-    return this.msgType;
+    return this.messageType;
   }
 
   public int getPayloadLength() {
@@ -451,7 +465,7 @@ public class Message {
 
   public Part getPart(int index) {
     if (index < this.numberOfParts) {
-      Part p = partsList[index];
+      Part p = this.partsList[index];
       if (this.version != null) {
         p.setVersion(this.version);
       }
@@ -480,9 +494,9 @@ public class Message {
     if (len != 0) {
       this.payloadLength = 0;
     }
-    if (this.hdrRead) {
-      if (this.msgStats != null) {
-        this.msgStats.decMessagesBeingReceived(len);
+    if (this.readHeader) {
+      if (this.messageStats != null) {
+        this.messageStats.decMessagesBeingReceived(len);
       }
     }
     ByteBuffer buffer = getCommBuffer();
@@ -495,20 +509,18 @@ public class Message {
       this.dataLimiter = null;
       this.maxIncomingMessageLength = 0;
     }
-    if (this.hdrRead) {
-      if (this.msgLimiter != null) {
-        this.msgLimiter.release(1);
-        this.msgLimiter = null;
+    if (this.readHeader) {
+      if (this.messageLimiter != null) {
+        this.messageLimiter.release(1);
+        this.messageLimiter = null;
       }
-      this.hdrRead = false;
+      this.readHeader = false;
     }
     this.flags = 0;
   }
 
   protected void packHeaderInfoForSending(int msgLen, boolean 
isSecurityHeader) {
-    // hitesh: setting second bit of flags byte for client
-    // this is not require but this makes all changes easily at client side 
right now
-    // just see this bit and process security header
+    // setting second bit of flags byte for client this is not require but 
this makes all changes easily at client side right now just see this bit and 
process security header
     byte flagsByte = this.flags;
     if (isSecurityHeader) {
       flagsByte |= MESSAGE_HAS_SECURE_PART;
@@ -516,14 +528,14 @@ public class Message {
     if (this.isRetry) {
       flagsByte |= MESSAGE_IS_RETRY;
     }
-    
getCommBuffer().putInt(this.msgType).putInt(msgLen).putInt(this.numberOfParts)
-        .putInt(this.transactionId).put(flagsByte);
+    
getCommBuffer().putInt(this.messageType).putInt(msgLen).putInt(this.numberOfParts)
+                   .putInt(this.transactionId).put(flagsByte);
   }
 
   protected Part getSecurityPart() {
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // look types right put get etc
-      return this.sc.updateAndGetSecurityPart();
+      return this.serverConnection.updateAndGetSecurityPart();
     }
     return null;
   }
@@ -537,7 +549,7 @@ public class Message {
     this.isMetaRegion = isMetaRegion;
   }
 
-  public boolean getAndResetIsMetaRegion() {
+  boolean getAndResetIsMetaRegion() {
     boolean isMetaRegion = this.isMetaRegion;
     this.isMetaRegion = false;
     return isMetaRegion;
@@ -546,21 +558,20 @@ public class Message {
   /**
    * Sends this message out on its socket.
    */
-  protected void sendBytes(boolean clearMessage) throws IOException {
-    if (this.sc != null) {
+  void sendBytes(boolean clearMessage) throws IOException {
+    if (this.serverConnection != null) {
       // Keep track of the fact that we are making progress.
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
     if (this.socket == null) {
       throw new 
IOException(LocalizedStrings.Message_DEAD_CONNECTION.toLocalizedString());
     }
     try {
-      final ByteBuffer cb = getCommBuffer();
-      if (cb == null) {
+      final ByteBuffer commBuffer = getCommBuffer();
+      if (commBuffer == null) {
         throw new IOException("No buffer");
       }
-      int msgLen = 0;
-      synchronized (cb) {
+      synchronized (commBuffer) {
         long totalPartLen = 0;
         long headerLen = 0;
         int partsToTransmit = this.numberOfParts;
@@ -581,50 +592,50 @@ public class Message {
           partsToTransmit++;
         }
 
-        if ((headerLen + totalPartLen) > Integer.MAX_VALUE) {
+        if (headerLen + totalPartLen > Integer.MAX_VALUE) {
           throw new MessageTooLargeException(
               "Message size (" + (headerLen + totalPartLen) + ") exceeds 
maximum integer value");
         }
 
-        msgLen = (int) (headerLen + totalPartLen);
+        int msgLen = (int) (headerLen + totalPartLen);
 
-        if (msgLen > MAX_MESSAGE_SIZE) {
+        if (msgLen > this.maxMessageSize) {
           throw new MessageTooLargeException("Message size (" + msgLen
-              + ") exceeds gemfire.client.max-message-size setting (" + 
MAX_MESSAGE_SIZE + ")");
+                                             + ") exceeds 
gemfire.client.max-message-size setting (" + this.maxMessageSize + ")");
         }
 
-        cb.clear();
-        packHeaderInfoForSending(msgLen, (securityPart != null));
+        commBuffer.clear();
+        packHeaderInfoForSending(msgLen, securityPart != null);
         for (int i = 0; i < partsToTransmit; i++) {
-          Part part = (i == this.numberOfParts) ? securityPart : partsList[i];
+          Part part = i == this.numberOfParts ? securityPart : 
this.partsList[i];
 
-          if (cb.remaining() < PART_HEADER_SIZE) {
+          if (commBuffer.remaining() < PART_HEADER_SIZE) {
             flushBuffer();
           }
 
           int partLen = part.getLength();
-          cb.putInt(partLen);
-          cb.put(part.getTypeCode());
-          if (partLen <= cb.remaining()) {
-            part.writeTo(cb);
+          commBuffer.putInt(partLen);
+          commBuffer.put(part.getTypeCode());
+          if (partLen <= commBuffer.remaining()) {
+            part.writeTo(commBuffer);
           } else {
             flushBuffer();
-            if (this.sockCh != null) {
-              part.writeTo(this.sockCh, cb);
+            if (this.socketChannel != null) {
+              part.writeTo(this.socketChannel, commBuffer);
             } else {
-              part.writeTo(this.os, cb);
+              part.writeTo(this.outputStream, commBuffer);
             }
-            if (this.msgStats != null) {
-              this.msgStats.incSentBytes(partLen);
+            if (this.messageStats != null) {
+              this.messageStats.incSentBytes(partLen);
             }
           }
         }
-        if (cb.position() != 0) {
+        if (commBuffer.position() != 0) {
           flushBuffer();
         }
         this.messageModified = false;
-        if (this.sockCh == null) {
-          this.os.flush();
+        if (this.socketChannel == null) {
+          this.outputStream.flush();
         }
       }
     } finally {
@@ -634,69 +645,67 @@ public class Message {
     }
   }
 
-  protected void flushBuffer() throws IOException {
+  void flushBuffer() throws IOException {
     final ByteBuffer cb = getCommBuffer();
-    if (this.sockCh != null) {
+    if (this.socketChannel != null) {
       cb.flip();
       do {
-        this.sockCh.write(cb);
+        this.socketChannel.write(cb);
       } while (cb.remaining() > 0);
     } else {
-      this.os.write(cb.array(), 0, cb.position());
+      this.outputStream.write(cb.array(), 0, cb.position());
     }
-    if (this.msgStats != null) {
-      this.msgStats.incSentBytes(cb.position());
+    if (this.messageStats != null) {
+      this.messageStats.incSentBytes(cb.position());
     }
     cb.clear();
   }
 
   private void read() throws IOException {
     clearParts();
-    // TODO:Hitesh ??? for server changes make sure sc is not null as this 
class also used by client
-    // :(
+    // TODO: for server changes make sure sc is not null as this class also 
used by client
     readHeaderAndPayload();
   }
 
   /**
    * Read the actual bytes of the header off the socket
    */
-  protected void fetchHeader() throws IOException {
+  void fetchHeader() throws IOException {
     final ByteBuffer cb = getCommBuffer();
     cb.clear();
-    // msgType is invalidated here and can be used as an indicator
+    
+    // messageType is invalidated here and can be used as an indicator
     // of problems reading the message
-    this.msgType = MessageType.INVALID;
-
-    int hdr = 0;
+    this.messageType = MessageType.INVALID;
 
     final int headerLength = getHeaderLength();
-    if (this.sockCh != null) {
+    if (this.socketChannel != null) {
       cb.limit(headerLength);
       do {
-        int bytesRead = this.sockCh.read(cb);
-        // System.out.println("DEBUG: fetchHeader read " + bytesRead + " bytes 
commBuffer=" + cb);
+        int bytesRead = this.socketChannel.read(cb);
         if (bytesRead == -1) {
           throw new EOFException(
               
LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
                   .toLocalizedString());
         }
-        if (this.msgStats != null) {
-          this.msgStats.incReceivedBytes(bytesRead);
+        if (this.messageStats != null) {
+          this.messageStats.incReceivedBytes(bytesRead);
         }
       } while (cb.remaining() > 0);
       cb.flip();
+      
     } else {
+      int hdr = 0;
       do {
-        int bytesRead = -1;
-        bytesRead = this.is.read(cb.array(), hdr, headerLength - hdr);
+        int bytesRead = this.inputStream.read(cb.array(), hdr, headerLength - 
hdr);
         if (bytesRead == -1) {
           throw new EOFException(
               
LocalizedStrings.Message_THE_CONNECTION_HAS_BEEN_RESET_WHILE_READING_THE_HEADER
                   .toLocalizedString());
         }
         hdr += bytesRead;
-        if (this.msgStats != null) {
-          this.msgStats.incReceivedBytes(bytesRead);
+        if (this.messageStats != null) {
+          this.messageStats.incReceivedBytes(bytesRead);
         }
       } while (hdr < headerLength);
 
@@ -717,34 +726,36 @@ public class Message {
 
     if (!MessageType.validate(type)) {
       throw new 
IOException(LocalizedStrings.Message_INVALID_MESSAGE_TYPE_0_WHILE_READING_HEADER
-          .toLocalizedString(Integer.valueOf(type)));
+          .toLocalizedString(type));
     }
+    
     int timeToWait = 0;
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // Keep track of the fact that a message is being processed.
-      this.sc.setProcessingMessage();
-      timeToWait = sc.getClientReadTimeout();
+      this.serverConnection.setProcessingMessage();
+      timeToWait = this.serverConnection.getClientReadTimeout();
     }
-    this.hdrRead = true;
-    if (this.msgLimiter != null) {
+    this.readHeader = true;
+    
+    if (this.messageLimiter != null) {
       for (;;) {
-        this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+        
this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
         boolean interrupted = Thread.interrupted();
         try {
           if (timeToWait == 0) {
-            this.msgLimiter.acquire(1);
+            this.messageLimiter.acquire(1);
           } else {
-            if (!this.msgLimiter.tryAcquire(1, timeToWait, 
TimeUnit.MILLISECONDS)) {
-              if (this.msgStats != null && this.msgStats instanceof 
CacheServerStats) {
-                ((CacheServerStats) this.msgStats).incConnectionsTimedOut();
+            if (!this.messageLimiter.tryAcquire(1, timeToWait, 
TimeUnit.MILLISECONDS)) {
+              if (this.messageStats instanceof CacheServerStats) {
+                ((CacheServerStats) 
this.messageStats).incConnectionsTimedOut();
               }
               throw new IOException(
                   
LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_MESSAGE_LIMITER_AFTER_WAITING_0_MILLISECONDS
-                      .toLocalizedString(Integer.valueOf(timeToWait)));
+                      .toLocalizedString(timeToWait));
             }
           }
           break;
-        } catch (InterruptedException e) {
+        } catch (InterruptedException ignore) {
           interrupted = true;
         } finally {
           if (interrupted) {
@@ -753,16 +764,19 @@ public class Message {
         }
       } // for
     }
+    
     if (len > 0) {
       if (this.maxIncomingMessageLength > 0 && len > 
this.maxIncomingMessageLength) {
         throw new 
IOException(LocalizedStrings.Message_MESSAGE_SIZE_0_EXCEEDED_MAX_LIMIT_OF_1
-            .toLocalizedString(new Object[] {Integer.valueOf(len),
-                Integer.valueOf(this.maxIncomingMessageLength)}));
+            .toLocalizedString(new Object[] {
+              len, this.maxIncomingMessageLength
+            }));
       }
+      
       if (this.dataLimiter != null) {
         for (;;) {
-          if (sc != null) {
-            this.sc.getCachedRegionHelper().checkCancelInProgress(null);
+          if (this.serverConnection != null) {
+            
this.serverConnection.getCachedRegionHelper().checkCancelInProgress(null);
           }
           boolean interrupted = Thread.interrupted();
           try {
@@ -770,21 +784,21 @@ public class Message {
               this.dataLimiter.acquire(len);
             } else {
               int newTimeToWait = timeToWait;
-              if (this.msgLimiter != null) {
+              if (this.messageLimiter != null) {
                 // may have waited for msg limit so recalc time to wait
-                newTimeToWait -= (int) sc.getCurrentMessageProcessingTime();
+                newTimeToWait -= (int) 
this.serverConnection.getCurrentMessageProcessingTime();
               }
               if (newTimeToWait <= 0
-                  || !this.msgLimiter.tryAcquire(1, newTimeToWait, 
TimeUnit.MILLISECONDS)) {
+                  || !this.messageLimiter.tryAcquire(1, newTimeToWait, 
TimeUnit.MILLISECONDS)) {
                 throw new IOException(
                     
LocalizedStrings.Message_OPERATION_TIMED_OUT_ON_SERVER_WAITING_ON_CONCURRENT_DATA_LIMITER_AFTER_WAITING_0_MILLISECONDS
                         .toLocalizedString(timeToWait));
               }
             }
-            this.payloadLength = len; // makes sure payloadLength gets set now 
so we will release
-                                      // the semaphore
+            // makes sure payloadLength gets set now so we will release the 
semaphore
+            this.payloadLength = len;
             break; // success
-          } catch (InterruptedException e) {
+          } catch (InterruptedException ignore) {
             interrupted = true;
           } finally {
             if (interrupted) {
@@ -794,15 +808,15 @@ public class Message {
         }
       }
     }
-    if (this.msgStats != null) {
-      this.msgStats.incMessagesBeingReceived(len);
+    if (this.messageStats != null) {
+      this.messageStats.incMessagesBeingReceived(len);
       this.payloadLength = len; // makes sure payloadLength gets set now so we 
will dec on clear
     }
 
     this.isRetry = (bits & MESSAGE_IS_RETRY) != 0;
-    bits = (byte) (bits & MESSAGE_IS_RETRY_MASK);
+    bits &= MESSAGE_IS_RETRY_MASK;
     this.flags = bits;
-    this.msgType = type;
+    this.messageType = type;
 
     readPayloadFields(numParts, len);
 
@@ -813,32 +827,38 @@ public class Message {
     // this.numberOfParts = numParts; Already set in setPayloadFields via 
setNumberOfParts
     this.transactionId = txid;
     this.flags = bits;
-    if (this.sc != null) {
+    if (this.serverConnection != null) {
       // Keep track of the fact that a message is being processed.
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
   }
 
-  protected void readPayloadFields(final int numParts, final int len) throws 
IOException {
+  /**
+   * TODO: refactor overly long method readPayloadFields
+   */
+  void readPayloadFields(final int numParts, final int len) throws IOException 
{
     if (len > 0 && numParts <= 0 || len <= 0 && numParts > 0) {
       throw new IOException(
           
LocalizedStrings.Message_PART_LENGTH_0_AND_NUMBER_OF_PARTS_1_INCONSISTENT
-              .toLocalizedString(new Object[] {Integer.valueOf(len), 
Integer.valueOf(numParts)}));
+              .toLocalizedString(new Object[] { len, numParts }));
     }
 
-    Integer msgType = messageType.get();
+    Integer msgType = MESSAGE_TYPE.get();
     if (msgType != null && msgType == MessageType.PING) {
-      messageType.set(null); // set it to null right away.
-      int pingParts = 10; // Some number which will not throw OOM but still be 
acceptable for a ping
-                          // operation.
+      // set it to null right away.
+      MESSAGE_TYPE.set(null);
+      // Some number which will not throw OOM but still be acceptable for a 
ping operation.
+      int pingParts = 10;
       if (numParts > pingParts) {
         throw new IOException("Part length ( " + numParts + " ) is  
inconsistent for "
             + MessageType.getString(msgType) + " operation.");
       }
     }
+    
     setNumberOfParts(numParts);
-    if (numParts <= 0)
+    if (numParts <= 0) {
       return;
+    }
 
     if (len < 0) {
       
logger.info(LocalizedMessage.create(LocalizedStrings.Message_RPL_NEG_LEN__0, 
len));
@@ -849,12 +869,10 @@ public class Message {
     cb.clear();
     cb.flip();
 
-    int readSecurePart = 0;
-    readSecurePart = checkAndSetSecurityPart();
+    int readSecurePart = checkAndSetSecurityPart();
 
     int bytesRemaining = len;
-    for (int i = 0; ((i < numParts + readSecurePart)
-        || ((readSecurePart == 1) && (cb.remaining() > 0))); i++) {
+    for (int i = 0; i < numParts + readSecurePart || readSecurePart == 1 && 
cb.remaining() > 0; i++) {
       int bytesReadThisTime = readPartChunk(bytesRemaining);
       bytesRemaining -= bytesReadThisTime;
 
@@ -869,6 +887,7 @@ public class Message {
       int partLen = cb.getInt();
       byte partType = cb.get();
       byte[] partBytes = null;
+      
       if (partLen > 0) {
         partBytes = new byte[partLen];
         int alreadyReadBytes = cb.remaining();
@@ -878,26 +897,27 @@ public class Message {
           }
           cb.get(partBytes, 0, alreadyReadBytes);
         }
+        
         // now we need to read partLen - alreadyReadBytes off the wire
         int off = alreadyReadBytes;
         int remaining = partLen - off;
         while (remaining > 0) {
-          if (this.sockCh != null) {
+          if (this.socketChannel != null) {
             int bytesThisTime = remaining;
             cb.clear();
             if (bytesThisTime > cb.capacity()) {
               bytesThisTime = cb.capacity();
             }
             cb.limit(bytesThisTime);
-            int res = this.sockCh.read(cb);
+            int res = this.socketChannel.read(cb);
             if (res != -1) {
               cb.flip();
               bytesRemaining -= res;
               remaining -= res;
               cb.get(partBytes, off, res);
               off += res;
-              if (this.msgStats != null) {
-                this.msgStats.incReceivedBytes(res);
+              if (this.messageStats != null) {
+                this.messageStats.incReceivedBytes(res);
               }
             } else {
               throw new EOFException(
@@ -905,14 +925,13 @@ public class Message {
                       .toLocalizedString());
             }
           } else {
-            int res = 0;
-            res = this.is.read(partBytes, off, remaining);
+            int res = this.inputStream.read(partBytes, off, remaining);
             if (res != -1) {
               bytesRemaining -= res;
               remaining -= res;
               off += res;
-              if (this.msgStats != null) {
-                this.msgStats.incReceivedBytes(res);
+              if (this.messageStats != null) {
+                this.messageStats.incReceivedBytes(res);
               }
             } else {
               throw new EOFException(
@@ -941,35 +960,38 @@ public class Message {
    * @return the number of bytes read into commBuffer
    */
   private int readPartChunk(int bytesRemaining) throws IOException {
-    final ByteBuffer cb = getCommBuffer();
-    if (cb.remaining() >= PART_HEADER_SIZE) {
+    final ByteBuffer commBuffer = getCommBuffer();
+    if (commBuffer.remaining() >= PART_HEADER_SIZE) {
       // we already have the next part header in commBuffer so just return
       return 0;
     }
-    if (cb.position() != 0) {
-      cb.compact();
+    
+    if (commBuffer.position() != 0) {
+      commBuffer.compact();
     } else {
-      cb.position(cb.limit());
-      cb.limit(cb.capacity());
+      commBuffer.position(commBuffer.limit());
+      commBuffer.limit(commBuffer.capacity());
     }
-    int bytesRead = 0;
-    if (this.sc != null) {
+    
+    if (this.serverConnection != null) {
       // Keep track of the fact that we are making progress
-      this.sc.updateProcessingMessage();
+      this.serverConnection.updateProcessingMessage();
     }
-    if (this.sockCh != null) {
-      int remaining = cb.remaining();
+    int bytesRead = 0;
+    
+    if (this.socketChannel != null) {
+      int remaining = commBuffer.remaining();
       if (remaining > bytesRemaining) {
         remaining = bytesRemaining;
-        cb.limit(cb.position() + bytesRemaining);
+        commBuffer.limit(commBuffer.position() + bytesRemaining);
       }
       while (remaining > 0) {
-        int res = this.sockCh.read(cb);
+        int res = this.socketChannel.read(commBuffer);
         if (res != -1) {
           remaining -= res;
           bytesRead += res;
-          if (this.msgStats != null) {
-            this.msgStats.incReceivedBytes(res);
+          if (this.messageStats != null) {
+            this.messageStats.incReceivedBytes(res);
           }
         } else {
           throw new EOFException(
@@ -979,21 +1001,20 @@ public class Message {
       }
 
     } else {
-      int bufSpace = cb.capacity() - cb.position();
-      int bytesToRead = bufSpace;
+      int bytesToRead = commBuffer.capacity() - commBuffer.position();
       if (bytesRemaining < bytesToRead) {
         bytesToRead = bytesRemaining;
       }
-      int pos = cb.position();
+      int pos = commBuffer.position();
+      
       while (bytesToRead > 0) {
-        int res = 0;
-        res = this.is.read(cb.array(), pos, bytesToRead);
+        int res = this.inputStream.read(commBuffer.array(), pos, bytesToRead);
         if (res != -1) {
           bytesToRead -= res;
           pos += res;
           bytesRead += res;
-          if (this.msgStats != null) {
-            this.msgStats.incReceivedBytes(res);
+          if (this.messageStats != null) {
+            this.messageStats.incReceivedBytes(res);
           }
         } else {
           throw new EOFException(
@@ -1001,9 +1022,10 @@ public class Message {
                   .toLocalizedString());
         }
       }
-      cb.position(pos);
+      
+      commBuffer.position(pos);
     }
-    cb.flip();
+    commBuffer.flip();
     return bytesRead;
   }
 
@@ -1011,40 +1033,39 @@ public class Message {
    * Gets rid of all the parts that have been added to this message.
    */
   public void clearParts() {
-    for (int i = 0; i < partsList.length; i++) {
-      partsList[i].clear();
+    for (Part part : this.partsList) {
+      part.clear();
     }
     this.currentPart = 0;
   }
 
   @Override
   public String toString() {
-    StringBuffer sb = new StringBuffer();
-    sb.append("type=").append(MessageType.getString(msgType));
-    sb.append("; payloadLength=").append(payloadLength);
-    sb.append("; numberOfParts=").append(numberOfParts);
-    sb.append("; transactionId=").append(transactionId);
-    sb.append("; currentPart=").append(currentPart);
-    sb.append("; messageModified=").append(messageModified);
-    sb.append("; flags=").append(Integer.toHexString(flags));
-    for (int i = 0; i < numberOfParts; i++) {
+    StringBuilder sb = new StringBuilder();
+    sb.append("type=").append(MessageType.getString(this.messageType));
+    sb.append("; payloadLength=").append(this.payloadLength);
+    sb.append("; numberOfParts=").append(this.numberOfParts);
+    sb.append("; transactionId=").append(this.transactionId);
+    sb.append("; currentPart=").append(this.currentPart);
+    sb.append("; messageModified=").append(this.messageModified);
+    sb.append("; flags=").append(Integer.toHexString(this.flags));
+    for (int i = 0; i < this.numberOfParts; i++) {
       sb.append("; part[").append(i).append("]={");
-      sb.append(this.partsList[i].toString());
+      sb.append(this.partsList[i]);
       sb.append("}");
     }
     return sb.toString();
   }
 
-
-  public void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, 
MessageStats msgStats)
+  void setComms(ServerConnection sc, Socket socket, ByteBuffer bb, 
MessageStats msgStats)
       throws IOException {
-    this.sc = sc;
+    this.serverConnection = sc;
     setComms(socket, bb, msgStats);
   }
 
-  public void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) 
throws IOException {
-    this.sockCh = socket.getChannel();
-    if (this.sockCh == null) {
+  void setComms(Socket socket, ByteBuffer bb, MessageStats msgStats) throws 
IOException {
+    this.socketChannel = socket.getChannel();
+    if (this.socketChannel == null) {
       setComms(socket, socket.getInputStream(), socket.getOutputStream(), bb, 
msgStats);
     } else {
       setComms(socket, null, null, bb, msgStats);
@@ -1052,14 +1073,14 @@ public class Message {
   }
 
   public void setComms(Socket socket, InputStream is, OutputStream os, 
ByteBuffer bb,
-      MessageStats msgStats) throws IOException {
+      MessageStats msgStats) {
     Assert.assertTrue(socket != null);
     this.socket = socket;
-    this.sockCh = socket.getChannel();
-    this.is = is;
-    this.os = os;
+    this.socketChannel = socket.getChannel();
+    this.inputStream = is;
+    this.outputStream = os;
     this.cachedCommBuffer = bb;
-    this.msgStats = msgStats;
+    this.messageStats = msgStats;
   }
 
   /**
@@ -1069,11 +1090,11 @@ public class Message {
    */
   public void unsetComms() {
     this.socket = null;
-    this.sockCh = null;
-    this.is = null;
-    this.os = null;
+    this.socketChannel = null;
+    this.inputStream = null;
+    this.outputStream = null;
     this.cachedCommBuffer = null;
-    this.msgStats = null;
+    this.messageStats = null;
   }
 
   /**
@@ -1084,7 +1105,7 @@ public class Message {
   }
 
   public void send(ServerConnection servConn) throws IOException {
-    if (this.sc != servConn)
+    if (this.serverConnection != servConn)
       throw new IllegalStateException("this.sc was not correctly set");
     send(true);
   }
@@ -1097,7 +1118,7 @@ public class Message {
   }
 
   /**
-   * Populates the stats of this <code>Message</code> with information 
received via its socket
+   * Populates the stats of this {@code Message} with information received via 
its socket
    */
   public void recv() throws IOException {
     if (this.socket != null) {
@@ -1111,10 +1132,10 @@ public class Message {
 
   public void recv(ServerConnection sc, int maxMessageLength, Semaphore 
dataLimiter,
       Semaphore msgLimiter) throws IOException {
-    this.sc = sc;
+    this.serverConnection = sc;
     this.maxIncomingMessageLength = maxMessageLength;
     this.dataLimiter = dataLimiter;
-    this.msgLimiter = msgLimiter;
+    this.messageLimiter = msgLimiter;
     recv();
   }
 

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
index 83d0e9d..dfda14f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerConnection.java
@@ -723,12 +723,7 @@ public class ServerConnection implements Runnable {
     ThreadState threadState = null;
     try {
       if (msg != null) {
-        // this.logger.fine("donormalMsg() msgType " + msg.getMessageType());
-        // Since this thread is not interrupted when the cache server is
-        // shutdown,
-        // test again after a message has been read. This is a bit of a hack. I
-        // think this thread should be interrupted, but currently AcceptorImpl
-        // doesn't keep track of the threads that it launches.
+        // Since this thread is not interrupted when the cache server is 
shutdown, test again after a message has been read. This is a bit of a hack. I 
think this thread should be interrupted, but currently AcceptorImpl doesn't 
keep track of the threads that it launches.
         if (!this.processMessages || (crHelper.isShutdown())) {
           if (logger.isDebugEnabled()) {
             logger.debug("{} ignoring message of type {} from client {} due to 
shutdown.",
@@ -1078,8 +1073,6 @@ public class ServerConnection implements Runnable {
    */
   public Part updateAndGetSecurityPart() {
     // need to take care all message types here
-    // this.logger.fine("getSecurityPart() msgType = "
-    // + this.requestMsg.msgType);
     if (AcceptorImpl.isAuthenticationRequired()
         && this.handshake.getVersion().compareTo(Version.GFE_65) >= 0
         && (this.communicationMode != Acceptor.GATEWAY_TO_GATEWAY)
@@ -1090,40 +1083,40 @@ public class ServerConnection implements Runnable {
       if (AcceptorImpl.isAuthenticationRequired() && logger.isDebugEnabled()) {
         logger.debug(
             "ServerConnection.updateAndGetSecurityPart() not adding security 
part for msg type {}",
-            MessageType.getString(this.requestMsg.msgType));
+            MessageType.getString(this.requestMsg.messageType));
       }
     }
     return null;
   }
 
   private boolean isInternalMessage() {
-    return (this.requestMsg.msgType == MessageType.CLIENT_READY
-        || this.requestMsg.msgType == MessageType.CLOSE_CONNECTION
-        || this.requestMsg.msgType == MessageType.GETCQSTATS_MSG_TYPE
-        || this.requestMsg.msgType == 
MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
-        || this.requestMsg.msgType == MessageType.GET_CLIENT_PR_METADATA
-        || this.requestMsg.msgType == MessageType.INVALID
-        || this.requestMsg.msgType == MessageType.MAKE_PRIMARY
-        || this.requestMsg.msgType == MessageType.MONITORCQ_MSG_TYPE
-        || this.requestMsg.msgType == MessageType.PERIODIC_ACK
-        || this.requestMsg.msgType == MessageType.PING
-        || this.requestMsg.msgType == MessageType.REGISTER_DATASERIALIZERS
-        || this.requestMsg.msgType == MessageType.REGISTER_INSTANTIATORS
-        || this.requestMsg.msgType == MessageType.REQUEST_EVENT_VALUE
-        || this.requestMsg.msgType == MessageType.ADD_PDX_TYPE
-        || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_TYPE
-        || this.requestMsg.msgType == MessageType.GET_PDX_TYPE_BY_ID
-        || this.requestMsg.msgType == MessageType.SIZE
-        || this.requestMsg.msgType == MessageType.TX_FAILOVER
-        || this.requestMsg.msgType == MessageType.TX_SYNCHRONIZATION
-        || this.requestMsg.msgType == MessageType.GET_FUNCTION_ATTRIBUTES
-        || this.requestMsg.msgType == MessageType.ADD_PDX_ENUM
-        || this.requestMsg.msgType == MessageType.GET_PDX_ID_FOR_ENUM
-        || this.requestMsg.msgType == MessageType.GET_PDX_ENUM_BY_ID
-        || this.requestMsg.msgType == MessageType.GET_PDX_TYPES
-        || this.requestMsg.msgType == MessageType.GET_PDX_ENUMS
-        || this.requestMsg.msgType == MessageType.COMMIT
-        || this.requestMsg.msgType == MessageType.ROLLBACK);
+    return (this.requestMsg.messageType == MessageType.CLIENT_READY
+        || this.requestMsg.messageType == MessageType.CLOSE_CONNECTION
+        || this.requestMsg.messageType == MessageType.GETCQSTATS_MSG_TYPE
+        || this.requestMsg.messageType == 
MessageType.GET_CLIENT_PARTITION_ATTRIBUTES
+        || this.requestMsg.messageType == MessageType.GET_CLIENT_PR_METADATA
+        || this.requestMsg.messageType == MessageType.INVALID
+        || this.requestMsg.messageType == MessageType.MAKE_PRIMARY
+        || this.requestMsg.messageType == MessageType.MONITORCQ_MSG_TYPE
+        || this.requestMsg.messageType == MessageType.PERIODIC_ACK
+        || this.requestMsg.messageType == MessageType.PING
+        || this.requestMsg.messageType == MessageType.REGISTER_DATASERIALIZERS
+        || this.requestMsg.messageType == MessageType.REGISTER_INSTANTIATORS
+        || this.requestMsg.messageType == MessageType.REQUEST_EVENT_VALUE
+        || this.requestMsg.messageType == MessageType.ADD_PDX_TYPE
+        || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_TYPE
+        || this.requestMsg.messageType == MessageType.GET_PDX_TYPE_BY_ID
+        || this.requestMsg.messageType == MessageType.SIZE
+        || this.requestMsg.messageType == MessageType.TX_FAILOVER
+        || this.requestMsg.messageType == MessageType.TX_SYNCHRONIZATION
+        || this.requestMsg.messageType == MessageType.GET_FUNCTION_ATTRIBUTES
+        || this.requestMsg.messageType == MessageType.ADD_PDX_ENUM
+        || this.requestMsg.messageType == MessageType.GET_PDX_ID_FOR_ENUM
+        || this.requestMsg.messageType == MessageType.GET_PDX_ENUM_BY_ID
+        || this.requestMsg.messageType == MessageType.GET_PDX_TYPES
+        || this.requestMsg.messageType == MessageType.GET_PDX_ENUMS
+        || this.requestMsg.messageType == MessageType.COMMIT
+        || this.requestMsg.messageType == MessageType.ROLLBACK);
   }
 
   public void run() {

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 4e450c7..1afe6ff 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -2149,7 +2149,7 @@ public class Connection implements Runnable {
                 logger.fatal(LocalizedMessage
                     
.create(LocalizedStrings.Connection_FAILED_HANDLING_CHUNK_MESSAGE), ex);
               }
-            } else /* (msgType == END_CHUNKED_MSG_TYPE) */ {
+            } else /* (messageType == END_CHUNKED_MSG_TYPE) */ {
               MsgDestreamer md = obtainMsgDestreamer(msgId, remoteVersion);
               this.owner.getConduit().stats.incMessagesBeingReceived(md.size() 
== 0, len);
               try {

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java 
b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
index 031f827..80b16fc 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/IOUtils.java
@@ -30,8 +30,7 @@ import java.io.ObjectStreamClass;
 
 /**
  * Reusable Input/Output operation utility methods.
- * <p/>
- * 
+ *
  * @since GemFire 6.6
  */
 @SuppressWarnings("unused")
@@ -44,8 +43,7 @@ public abstract class IOUtils {
    * File.separator character. If the pathname is unspecified (null, empty or 
blank) then path
    * elements are considered relative to file system root, beginning with 
File.separator. If array
    * of path elements are null, then the pathname is returned as is.
-   * </p>
-   * 
+   *
    * @param pathname a String value indicating the base pathname.
    * @param pathElements the path elements to append to pathname.
    * @return the path elements appended to the pathname.

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
index 86fcbce..b2d903c 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/tier/sockets/MessageJUnitTest.java
@@ -32,53 +32,49 @@ import org.apache.geode.test.junit.categories.UnitTest;
 public class MessageJUnitTest {
 
   private Message message;
-  private Socket mockSocket;
-  private MessageStats mockStats;
-  private ByteBuffer msgBuffer;
-  private ServerConnection mockServerConnection;
 
   @Before
   public void setUp() throws Exception {
-    mockSocket = mock(Socket.class);
-    message = new Message(2, Version.CURRENT);
-    assertEquals(2, message.getNumberOfParts());
-    mockStats = mock(MessageStats.class);
-    msgBuffer = ByteBuffer.allocate(1000);
-    mockServerConnection = mock(ServerConnection.class);
-    message.setComms(mockServerConnection, mockSocket, msgBuffer, mockStats);
+    Socket mockSocket = mock(Socket.class);
+    this.message = new Message(2, Version.CURRENT);
+    assertEquals(2, this.message.getNumberOfParts());
+    MessageStats mockStats = mock(MessageStats.class);
+    ByteBuffer msgBuffer = ByteBuffer.allocate(1000);
+    ServerConnection mockServerConnection = mock(ServerConnection.class);
+    this.message.setComms(mockServerConnection, mockSocket, msgBuffer, 
mockStats);
   }
 
   @Test
   public void clearDoesNotThrowNPE() throws Exception {
     // unsetComms clears the message's ByteBuffer, which was causing an NPE 
during shutdown
     // when clear() was invoked
-    message.unsetComms();
-    message.clear();
+    this.message.unsetComms();
+    this.message.clear();
   }
 
   @Test
   public void numberOfPartsIsAdjusted() {
-    int numParts = message.getNumberOfParts();
-    message.setNumberOfParts(2 * numParts + 1);
-    assertEquals(2 * numParts + 1, message.getNumberOfParts());
-    message.addBytesPart(new byte[1]);
-    message.addIntPart(2);
-    message.addLongPart(3);
-    message.addObjPart("4");
-    message.addStringPart("5");
-    assertEquals(5, message.getNextPartNumber());
+    int numParts = this.message.getNumberOfParts();
+    this.message.setNumberOfParts(2 * numParts + 1);
+    assertEquals(2 * numParts + 1, this.message.getNumberOfParts());
+    this.message.addBytesPart(new byte[1]);
+    this.message.addIntPart(2);
+    this.message.addLongPart(3);
+    this.message.addObjPart("4");
+    this.message.addStringPart("5");
+    assertEquals(5, this.message.getNextPartNumber());
   }
 
   @Test
   public void messageLongerThanMaxIntIsRejected() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
     when(mockPart1.getLength()).thenReturn(Integer.MAX_VALUE / 2);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
+    this.message.setParts(parts);
     try {
-      message.send();
+      this.message.send();
       fail("expected an exception but none was thrown");
     } catch (MessageTooLargeException e) {
       assertTrue(e.getMessage().contains("exceeds maximum integer value"));
@@ -87,14 +83,14 @@ public class MessageJUnitTest {
 
   @Test
   public void maxMessageSizeIsRespected() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
-    when(mockPart1.getLength()).thenReturn(Message.MAX_MESSAGE_SIZE / 2);
+    when(mockPart1.getLength()).thenReturn(Message.DEFAULT_MAX_MESSAGE_SIZE / 
2);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
+    this.message.setParts(parts);
     try {
-      message.send();
+      this.message.send();
       fail("expected an exception but none was thrown");
     } catch (MessageTooLargeException e) {
       assertFalse(e.getMessage().contains("exceeds maximum integer value"));
@@ -103,21 +99,17 @@ public class MessageJUnitTest {
 
   /**
    * geode-1468: Message should clear the chunks in its Parts when performing 
cleanup.
-   * 
-   * @throws Exception
    */
   @Test
   public void streamBuffersAreClearedDuringCleanup() throws Exception {
-    Part[] parts = new Part[2];
     Part mockPart1 = mock(Part.class);
     when(mockPart1.getLength()).thenReturn(100);
+    Part[] parts = new Part[2];
     parts[0] = mockPart1;
     parts[1] = mockPart1;
-    message.setParts(parts);
-    message.clearParts();
+    this.message.setParts(parts);
+    this.message.clearParts();
     verify(mockPart1, times(2)).clear();
   }
 
-  // TODO many more tests are needed
-
 }

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
----------------------------------------------------------------------
diff --git 
a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
 
b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
index 5a679bb..110d649 100644
--- 
a/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
+++ 
b/geode-core/src/test/java/org/apache/geode/test/dunit/internal/JUnit4DistributedTestCase.java
@@ -600,11 +600,11 @@ public abstract class JUnit4DistributedTestCase 
implements DistributedTestFixtur
     RegionTestCase.preSnapshotRegion = null;
     SocketCreator.resetHostNameCache();
     SocketCreator.resolve_dns = true;
-    Message.MAX_MESSAGE_SIZE = Message.DEFAULT_MAX_MESSAGE_SIZE;
 
     // clear system properties -- keep alphabetized
     System.clearProperty(DistributionConfig.GEMFIRE_PREFIX + "log-level");
     System.clearProperty("jgroups.resolve_dns");
+    System.clearProperty(Message.MAX_MESSAGE_SIZE_PROPERTY);
 
     if (InternalDistributedSystem.systemAttemptingReconnect != null) {
       InternalDistributedSystem.systemAttemptingReconnect.stopReconnecting();

http://git-wip-us.apache.org/repos/asf/geode/blob/b4bf200d/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
----------------------------------------------------------------------
diff --git 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
index f403447..8cedbf0 100644
--- 
a/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
+++ 
b/geode-wan/src/test/java/org/apache/geode/internal/cache/wan/parallel/ParallelGatewaySenderOperationsDUnitTest.java
@@ -14,8 +14,10 @@
  */
 package org.apache.geode.internal.cache.wan.parallel;
 
+import static 
org.apache.geode.internal.cache.tier.sockets.Message.MAX_MESSAGE_SIZE_PROPERTY;
 import static org.apache.geode.test.dunit.Assert.*;
 
+import org.junit.Rule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
@@ -31,8 +33,8 @@ import org.apache.geode.test.dunit.IgnoredException;
 import org.apache.geode.test.dunit.LogWriterUtils;
 import org.apache.geode.test.dunit.RMIException;
 import org.apache.geode.test.dunit.Wait;
+import org.apache.geode.test.dunit.rules.DistributedRestoreSystemProperties;
 import org.apache.geode.test.junit.categories.DistributedTest;
-import org.apache.geode.test.junit.categories.FlakyTest;
 
 /**
  * DUnit test for operations on ParallelGatewaySender
@@ -40,6 +42,9 @@ import org.apache.geode.test.junit.categories.FlakyTest;
 @Category(DistributedTest.class)
 public class ParallelGatewaySenderOperationsDUnitTest extends WANTestBase {
 
+  @Rule
+  public DistributedRestoreSystemProperties restoreSystemProperties = new 
DistributedRestoreSystemProperties();
+
   @Override
   protected final void postSetUpWANTestBase() throws Exception {
     IgnoredException.addIgnoredException("Broken pipe||Unexpected 
IOException");
@@ -582,13 +587,14 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
 
   @Test
   public void testParallelGatewaySenderMessageTooLargeException() {
+    vm4.invoke(() -> System.setProperty(MAX_MESSAGE_SIZE_PROPERTY, 
String.valueOf(1024 * 1024)));
+
     Integer[] locatorPorts = createLNAndNYLocators();
     Integer lnPort = locatorPorts[0];
     Integer nyPort = locatorPorts[1];
 
     // Create and start sender with reduced maximum message size and 1 
dispatcher thread
     String regionName = getTestMethodName() + "_PR";
-    vm4.invoke(() -> setMaximumMessageSize(1024 * 1024));
     vm4.invoke(() -> createCache(lnPort));
     vm4.invoke(() -> setNumDispatcherThreadsForTheRun(1));
     vm4.invoke(() -> createSender("ln", 2, true, 100, 100, false, false, null, 
false));
@@ -617,12 +623,6 @@ public class ParallelGatewaySenderOperationsDUnitTest 
extends WANTestBase {
     ignoredGIOE.remove();
   }
 
-  private void setMaximumMessageSize(int maximumMessageSizeBytes) {
-    Message.MAX_MESSAGE_SIZE = maximumMessageSizeBytes;
-    LogWriterUtils.getLogWriter().info("Set gemfire.client.max-message-size: "
-        + System.getProperty(DistributionConfig.GEMFIRE_PREFIX + 
"client.max-message-size"));
-  }
-
   private void createSendersReceiversAndPartitionedRegion(Integer lnPort, 
Integer nyPort,
       boolean createAccessors, boolean startSenders) {
     // Note: This is a test-specific method used by several test to create

Reply via email to