This is an automated email from the ASF dual-hosted git repository.

bschuchardt pushed a commit to branch dsfid_separation_wip
in repository https://gitbox.apache.org/repos/asf/geode.git

commit 26e6aabf9729de216d726f135f707d4dfdce94c9
Author: Bruce Schuchardt <bschucha...@pivotal.io>
AuthorDate: Fri Aug 23 15:50:31 2019 -0700

    introduction of SerializationVersion, a superclass of Version
---
 ...ackwardCompatibilitySerializationDUnitTest.java |  11 +-
 .../cache/client/ClientCacheFactoryJUnitTest.java  |   4 +-
 .../internal/InternalDataSerializerBenchmark.java  |   2 +-
 .../client/internal/ClientSideHandshakeImpl.java   |  16 +--
 .../membership/InternalDistributedMember.java      |  13 +-
 .../internal/membership/gms/GMSMember.java         |  15 ++-
 .../membership/gms/messenger/JGroupsMessenger.java |   7 +-
 .../distributed/internal/tcpserver/TcpClient.java  |  17 ++-
 .../distributed/internal/tcpserver/TcpServer.java  |   4 +-
 .../geode/internal/HeapDataOutputStream.java       |   4 +-
 .../geode/internal/InternalDataSerializer.java     |  35 +++---
 .../java/org/apache/geode/internal/Version.java    | 130 ++-----------------
 .../geode/internal/VersionedObjectInput.java       |   8 +-
 .../geode/internal/VersionedObjectOutput.java      |   8 +-
 .../apache/geode/internal/cache/DiskInitFile.java  |   3 +-
 .../org/apache/geode/internal/cache/EventID.java   |   2 +-
 .../geode/internal/cache/FilterRoutingInfo.java    |   2 +-
 .../org/apache/geode/internal/cache/Oplog.java     |  10 +-
 .../cache/persistence/DiskInitFileParser.java      |   3 +-
 .../tier/sockets/ClientRegistrationMetadata.java   |   7 +-
 .../tier/sockets/ServerSideHandshakeFactory.java   |   3 +-
 .../tier/sockets/ServerSideHandshakeImpl.java      |  19 +--
 .../internal/cache/wan/GatewaySenderEventImpl.java |   2 +-
 .../serialization/BufferDataOutputStream.java      |  11 +-
 .../internal/serialization/ByteArrayDataInput.java |  17 +--
 .../internal/serialization/DSFIDSerializer.java    |  15 ++-
 .../serialization/SerializationVersion.java        | 140 +++++++++++++++++++++
 .../serialization/SerializationVersions.java       |   4 +-
 .../serialization/VersionedDataInputStream.java    |   6 +-
 .../serialization/VersionedDataOutputStream.java   |   6 +-
 .../serialization/VersionedDataStream.java         |   4 +-
 .../org/apache/geode/internal/tcp/Connection.java  |   4 +-
 .../apache/geode/internal/tcp/MsgDestreamer.java   |   2 +-
 .../tcp/VersionedByteBufferInputStream.java        |   8 +-
 .../geode/internal/tcp/VersionedMsgStreamer.java   |   8 +-
 .../org/apache/geode/internal/util/BlobHelper.java |   2 +-
 .../java/org/apache/geode/DataSerializerTest.java  |   4 +-
 .../cache/execute/FunctionAdapterJUnitTest.java    |   2 +-
 .../membership/gms/GMSMemberJUnitTest.java         |   2 +-
 .../geode/internal/ByteArrayDataInputTest.java     |   2 +-
 .../geode/internal/DataSerializableJUnitTest.java  |   4 +-
 .../geode/internal/cache/FilterInfoTest.java       |   2 +-
 .../cache/ha/EventIdOptimizationJUnitTest.java     |   4 +-
 .../internal/results/PageEntryJUnitTest.java       |   2 +-
 .../apache/geode/OldClientSupportDUnitTest.java    |  10 +-
 .../gemstone/gemfire/OldClientSupportProvider.java |   3 +-
 46 files changed, 319 insertions(+), 268 deletions(-)

diff --git 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java
 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java
index 7a0396c..b6a774c 100755
--- 
a/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java
+++ 
b/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java
@@ -35,6 +35,7 @@ import org.junit.experimental.categories.Category;
 import org.apache.geode.cache.Cache;
 import 
org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.SerializationVersions;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
@@ -93,7 +94,7 @@ public class BackwardCompatibilitySerializationDUnitTest 
extends JUnit4CacheTest
   @Test
   public void testToDataFromHigherVersionToLower() throws Exception {
     DataOutputStream dos =
-        new VersionedDataOutputStream(new DataOutputStream(baos), 
Version.GFE_56.ordinal());
+        new VersionedDataOutputStream(new DataOutputStream(baos), 
Version.GFE_56);
     InternalDataSerializer.writeDSFID(msg, dos);
     assertTrue(toDataPre66Called);
     assertFalse(toDataCalled);
@@ -107,7 +108,7 @@ public class BackwardCompatibilitySerializationDUnitTest 
extends JUnit4CacheTest
   @Test
   public void testToDataFromLowerVersionToHigher() throws Exception {
     DataOutputStream dos =
-        new VersionedDataOutputStream(new DataOutputStream(baos), 
Version.GFE_701.ordinal());
+        new VersionedDataOutputStream(new DataOutputStream(baos), 
Version.GFE_701);
     InternalDataSerializer.writeDSFID(msg, dos);
     assertTrue(toDataCalled);
   }
@@ -123,7 +124,7 @@ public class BackwardCompatibilitySerializationDUnitTest 
extends JUnit4CacheTest
     this.bais = new ByteArrayInputStream(baos.toByteArray());
 
     DataInputStream dis =
-        new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_701.ordinal());
+        new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_701);
     Object o = InternalDataSerializer.basicReadObject(dis);
     assertTrue(o instanceof TestMessage);
     assertTrue(fromDataCalled);
@@ -140,7 +141,7 @@ public class BackwardCompatibilitySerializationDUnitTest 
extends JUnit4CacheTest
     this.bais = new ByteArrayInputStream(baos.toByteArray());
 
     DataInputStream dis =
-        new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_56.ordinal());
+        new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_56);
     Object o = InternalDataSerializer.basicReadObject(dis);
     assertTrue(o instanceof TestMessage);
     assertTrue(fromDataPre66Called);
@@ -192,7 +193,7 @@ public class BackwardCompatibilitySerializationDUnitTest 
extends JUnit4CacheTest
   }
 
   private void checkSupportForRollingUpgrade(Object ds) {
-    Version[] versions = null;
+    SerializationVersion[] versions = null;
     if (ds instanceof SerializationVersions) {
       versions = ((SerializationVersions) ds).getSerializationVersions();
     }
diff --git 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java
 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java
index fbebdd6..cfa5ee0 100644
--- 
a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java
+++ 
b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java
@@ -356,7 +356,7 @@ public class ClientCacheFactoryJUnitTest {
 
     DataInputStream in =
         new VersionedDataInputStream(new 
ByteArrayInputStream(out.toByteArray()),
-            Version.CURRENT_ORDINAL);
+            Version.CURRENT);
     ClientProxyMembershipID newID = DataSerializer.readObject(in);
     InternalDistributedMember newMemberID =
         (InternalDistributedMember) newID.getDistributedMember();
@@ -374,7 +374,7 @@ public class ClientCacheFactoryJUnitTest {
     DataSerializer.writeObject(clientID, out);
 
     in = new VersionedDataInputStream(new 
ByteArrayInputStream(out.toByteArray()),
-        Version.CURRENT_ORDINAL);
+        Version.CURRENT);
     newID = DataSerializer.readObject(in);
     newMemberID = (InternalDistributedMember) newID.getDistributedMember();
     assertThat(newMemberID.getVersionObject()).isEqualTo(Version.CURRENT);
diff --git 
a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
 
b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
index cc12370..cddc566 100644
--- 
a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
+++ 
b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java
@@ -66,7 +66,7 @@ public class InternalDataSerializerBenchmark {
   @BenchmarkMode(Mode.Throughput)
   @OutputTimeUnit(TimeUnit.MILLISECONDS)
   public String readStringBenchmark() throws IOException {
-    dataInput.initialize(serializedBytes, Version.CURRENT_ORDINAL);
+    dataInput.initialize(serializedBytes, Version.CURRENT);
     String result = InternalDataSerializer.readString(dataInput, 
DSCODE.STRING_BYTES.toByte());
     return result;
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
index 3a61b0d..4dd4cf8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java
@@ -62,6 +62,7 @@ import org.apache.geode.internal.cache.tier.sockets.Handshake;
 import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus;
 import org.apache.geode.internal.security.SecurityService;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 import org.apache.geode.security.AuthenticationFailedException;
@@ -211,11 +212,11 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
       // Successful handshake for GATEWAY_TO_GATEWAY mode sets the peer 
version in connection
       if (communicationMode.isWAN() && !(acceptanceCode == 
REPLY_EXCEPTION_AUTHENTICATION_REQUIRED
           || acceptanceCode == REPLY_EXCEPTION_AUTHENTICATION_FAILED)) {
-        short wanSiteVersion = Version.readOrdinal(dis);
+        short wanSiteVersion = SerializationVersion.readOrdinal(dis);
         conn.setWanSiteVersion(wanSiteVersion);
         // establish a versioned stream for the other site, if necessary
         if (wanSiteVersion < Version.CURRENT_ORDINAL) {
-          dis = new VersionedDataInputStream(dis, wanSiteVersion);
+          dis = new VersionedDataInputStream(dis, 
Version.fromOrdinalOrCurrent(wanSiteVersion));
         }
       }
 
@@ -234,7 +235,8 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
       // Read delta-propagation property value from server.
       // [sumedh] Static variable below? Client can connect to different
       // DSes with different values of this. It shoule be a member variable.
-      if (!communicationMode.isWAN() && 
currentClientVersion.compareTo(Version.GFE_61) >= 0) {
+      if (!communicationMode.isWAN() && currentClientVersion.compareTo(
+          Version.GFE_61) >= 0) {
         ((InternalDistributedSystem) 
system).setDeltaEnabledOnServer(dis.readBoolean());
       }
 
@@ -270,7 +272,7 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
 
     byte[] memberBytes = DataSerializer.readByteArray(p_dis);
     Version v = InternalDataSerializer.getVersionForDataStreamOrNull(p_dis);
-    ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v == null ? 0 
: v.ordinal());
+    ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v);
     try {
       return DataSerializer.readObject(dis);
     } catch (EOFException e) {
@@ -372,9 +374,9 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
       hdos.writeByte(communicationMode.getModeNumber());
       if (overrideClientVersion > 0) {
         // for testing
-        Version.writeOrdinal(hdos, overrideClientVersion, true);
+        SerializationVersion.writeOrdinal(hdos, overrideClientVersion, true);
       } else {
-        Version.writeOrdinal(hdos, currentClientVersion.ordinal(), true);
+        SerializationVersion.writeOrdinal(hdos, 
currentClientVersion.ordinal(), true);
       }
 
       hdos.writeByte(replyCode);
@@ -389,7 +391,7 @@ public class ClientSideHandshakeImpl extends Handshake 
implements ClientSideHand
       // we do not know the receiver's version at this point, but the on-wire
       // form of InternalDistributedMember changed in 9.0, so we must serialize
       // it using the previous version
-      DataOutput idOut = new VersionedDataOutputStream(hdos, 
Version.GFE_82.ordinal());
+      DataOutput idOut = new VersionedDataOutputStream(hdos, Version.GFE_82);
       DataSerializer.writeObject(this.id, idOut);
 
       if (currentClientVersion.compareTo(Version.GFE_603) >= 0) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
index 8c0c597..df93ba7 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java
@@ -50,6 +50,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.versions.VersionSource;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 /**
  * This is the fundamental representation of a member of a GemFire distributed 
system.
@@ -115,7 +116,8 @@ public class InternalDistributedMember implements 
DistributedMember, Externaliza
 
   /** The versions in which this message was modified */
   @Immutable
-  private static final Version[] dsfidVersions = new Version[] 
{Version.GFE_71, Version.GFE_90};
+  private static final Version[] dsfidVersions = new Version[] {
+      Version.GFE_71, Version.GFE_90};
 
   private void defaultToCurrentHost() {
     netMbr.setProcessId(OSProcess.getId());
@@ -743,7 +745,7 @@ public class InternalDistributedMember implements 
DistributedMember, Externaliza
 
   private short readVersion(int flags, DataInput in) throws IOException {
     if ((flags & VERSION_BIT) != 0) {
-      short version = Version.readOrdinal(in);
+      short version = SerializationVersion.readOrdinal(in);
       this.versionObj = Version.fromOrdinalNoThrow(version, false);
       return version;
     } else {
@@ -799,7 +801,7 @@ public class InternalDistributedMember implements 
DistributedMember, Externaliza
     DataSerializer.writeString(attributes == null ? "" : attributes.getId(), 
out);
     DataSerializer.writeInteger(Integer.valueOf(attributes == null ? 300 : 
attributes.getTimeout()),
         out);
-    Version.writeOrdinal(out, netMbr.getVersionOrdinal(), true);
+    SerializationVersion.writeOrdinal(out, netMbr.getVersionOrdinal(), true);
     netMbr.writeAdditionalData(out);
   }
 
@@ -908,7 +910,7 @@ public class InternalDistributedMember implements 
DistributedMember, Externaliza
         durableClientAttributes == null ? 300 : 
durableClientAttributes.getTimeout()), out);
 
     short version = netMbr.getVersionOrdinal();
-    Version.writeOrdinal(out, version, true);
+    SerializationVersion.writeOrdinal(out, version, true);
   }
 
   public void toDataPre_GFE_7_1_0_0(DataOutput out) throws IOException {
@@ -1100,7 +1102,8 @@ public class InternalDistributedMember implements 
DistributedMember, Externaliza
     netMbr = MemberFactory.newNetMember(inetAddr, hostName, port, sbEnabled, 
elCoord,
         InternalDataSerializer.getVersionForDataStream(in).ordinal(), attr);
 
-    if 
(InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GFE_90) 
== 0) {
+    if (InternalDataSerializer.getVersionForDataStream(in).compareTo(
+        Version.GFE_90) == 0) {
       netMbr.readAdditionalData(in);
     }
   }
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java
index f059e64..0b8b996 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java
@@ -28,6 +28,7 @@ import org.apache.geode.internal.InternalDataSerializer;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.net.SocketCreator;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 /**
  * GMSMember is the membership identifier class for Group Membership Services.
@@ -521,7 +522,7 @@ public class GMSMember implements DataSerializableFixedID {
         .writeString(durableId == null ? "" : durableId, out);
     DataSerializer.writeInteger(durableId == null ? 300 : durableTimeout, out);
 
-    Version.writeOrdinal(out, versionOrdinal, true);
+    SerializationVersion.writeOrdinal(out, versionOrdinal, true);
 
     if (versionOrdinal >= Version.GFE_90.ordinal()) {
       writeAdditionalData(out);
@@ -529,7 +530,7 @@ public class GMSMember implements DataSerializableFixedID {
   }
 
   public void writeEssentialData(DataOutput out) throws IOException {
-    Version.writeOrdinal(out, this.versionOrdinal, true);
+    SerializationVersion.writeOrdinal(out, this.versionOrdinal, true);
 
     int flags = 0;
     if (networkPartitionDetectionEnabled)
@@ -543,7 +544,8 @@ public class GMSMember implements DataSerializableFixedID {
     out.writeInt(vmViewId);
     out.writeLong(uuidMSBs);
     out.writeLong(uuidLSBs);
-    if 
(InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.GEODE_1_2_0)
 >= 0) {
+    if (InternalDataSerializer.getVersionForDataStream(out).compareTo(
+        Version.GEODE_1_2_0) >= 0) {
       out.writeByte(vmKind);
     }
   }
@@ -587,7 +589,7 @@ public class GMSMember implements DataSerializableFixedID {
 
   private short readVersion(int flags, DataInput in) throws IOException {
     if ((flags & VERSION_BIT) != 0) {
-      return Version.readOrdinal(in);
+      return SerializationVersion.readOrdinal(in);
     } else {
       // prior to 7.1 member IDs did not serialize their version information
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in);
@@ -607,7 +609,7 @@ public class GMSMember implements DataSerializableFixedID {
   }
 
   public void readEssentialData(DataInput in) throws IOException, 
ClassNotFoundException {
-    this.versionOrdinal = Version.readOrdinal(in);
+    this.versionOrdinal = SerializationVersion.readOrdinal(in);
 
     int flags = in.readShort();
     this.networkPartitionDetectionEnabled = (flags & NPD_ENABLED_BIT) != 0;
@@ -623,7 +625,8 @@ public class GMSMember implements DataSerializableFixedID {
     this.vmViewId = in.readInt();
     this.uuidMSBs = in.readLong();
     this.uuidLSBs = in.readLong();
-    if 
(InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GEODE_1_2_0)
 >= 0) {
+    if (InternalDataSerializer.getVersionForDataStream(in).compareTo(
+        Version.GEODE_1_2_0) >= 0) {
       this.vmKind = in.readByte();
     }
     this.isPartial = true;
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
index 75385d9..305a857 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java
@@ -97,6 +97,7 @@ import 
org.apache.geode.internal.admin.remote.RemoteTransportConfig;
 import org.apache.geode.internal.alerting.AlertingAction;
 import org.apache.geode.internal.cache.DistributedCacheOperation;
 import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.tcp.MemberShunnedException;
 
@@ -1002,10 +1003,10 @@ public class JGroupsMessenger implements Messenger {
       DataInputStream dis =
           new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), 
jgmsg.getLength()));
 
-      short ordinal = Version.readOrdinal(dis);
+      short ordinal = SerializationVersion.readOrdinal(dis);
 
       if (ordinal < Version.CURRENT_ORDINAL) {
-        dis = new VersionedDataInputStream(dis, ordinal);
+        dis = new VersionedDataInputStream(dis, 
Version.fromOrdinalNoThrow(ordinal, true));
       }
 
       // read
@@ -1101,7 +1102,7 @@ public class JGroupsMessenger implements Messenger {
         DataInputStream in = new DataInputStream(new 
ByteArrayInputStream(data));
 
         if (ordinal < Version.CURRENT_ORDINAL) {
-          in = new VersionedDataInputStream(in, ordinal);
+          in = new VersionedDataInputStream(in, 
Version.fromOrdinalNoThrow(ordinal, true));
         }
 
         GMSMessage result = deserializeMessage(in, ordinal);
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
index 29f0b8e..230b0c9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java
@@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger;
 
 import org.apache.geode.DataSerializer;
 import org.apache.geode.annotations.internal.MakeNotStatic;
+import org.apache.geode.cache.UnsupportedVersionException;
 import org.apache.geode.distributed.internal.DistributionConfig;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.admin.SSLConfig;
@@ -209,7 +210,7 @@ public class TcpClient {
       out = new DataOutputStream(new 
BufferedOutputStream(sock.getOutputStream()));
 
       if (serverVersion < Version.CURRENT_ORDINAL) {
-        out = new VersionedDataOutputStream(out, serverVersion);
+        out = new VersionedDataOutputStream(out, 
Version.fromOrdinalNoThrow(serverVersion, false));
       }
 
       out.writeInt(gossipVersion);
@@ -221,7 +222,7 @@ public class TcpClient {
 
       if (replyExpected) {
         DataInputStream in = new DataInputStream(sock.getInputStream());
-        in = new VersionedDataInputStream(in, serverVersion);
+        in = new VersionedDataInputStream(in, 
Version.fromOrdinal(serverVersion, false));
         try {
           Object response = DataSerializer.readObject(in);
           logger.debug("received response: {}", response);
@@ -236,6 +237,13 @@ public class TcpClient {
       } else {
         return null;
       }
+    } catch (UnsupportedVersionException ex) {
+      if (logger.isDebugEnabled()) {
+        logger
+            .debug("Remote TcpServer version: " + serverVersion + " is higher 
than local version: "
+                + Version.CURRENT_ORDINAL + ". This is never expected as 
remoteVersion");
+      }
+      return null;
     } finally {
       try {
         if (replyExpected) {
@@ -288,8 +296,7 @@ public class TcpClient {
     try {
       OutputStream outputStream = new 
BufferedOutputStream(sock.getOutputStream());
       DataOutputStream out =
-          new VersionedDataOutputStream(new DataOutputStream(outputStream),
-              Version.GFE_57.ordinal());
+          new VersionedDataOutputStream(new DataOutputStream(outputStream), 
Version.GFE_57);
 
       out.writeInt(gossipVersion);
 
@@ -299,7 +306,7 @@ public class TcpClient {
 
       InputStream inputStream = sock.getInputStream();
       DataInputStream in = new DataInputStream(inputStream);
-      in = new VersionedDataInputStream(in, Version.GFE_57.ordinal());
+      in = new VersionedDataInputStream(in, Version.GFE_57);
       try {
         Object readObject = DataSerializer.readObject(in);
         if (!(readObject instanceof VersionResponse)) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
index 99658d0..42f5150 100755
--- 
a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java
@@ -458,7 +458,7 @@ public class TcpServer {
         log.debug("Locator reading request from " + socket.getInetAddress() + 
" with version "
             + Version.fromOrdinal(versionOrdinal, false));
       }
-      input = new VersionedDataInputStream(input, versionOrdinal);
+      input = new VersionedDataInputStream(input, 
Version.fromOrdinal(versionOrdinal, false));
       request = DataSerializer.readObject(input);
       if (log.isDebugEnabled()) {
         log.debug("Locator received request " + request + " from " + 
socket.getInetAddress());
@@ -484,7 +484,7 @@ public class TcpServer {
         DataOutputStream output = new 
DataOutputStream(socket.getOutputStream());
         if (versionOrdinal != Version.CURRENT_ORDINAL) {
           output =
-              new VersionedDataOutputStream(output, versionOrdinal);
+              new VersionedDataOutputStream(output, 
Version.fromOrdinal(versionOrdinal, false));
         }
         DataSerializer.writeObject(response, output);
         output.flush();
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java 
b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
index 7f16e5c..f99f371 100755
--- 
a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java
@@ -67,7 +67,7 @@ public class HeapDataOutputStream extends
    *        instead referenced.
    */
   public HeapDataOutputStream(int allocSize, Version version, boolean 
doNotCopy) {
-    super(allocSize, version == null ? Version.CURRENT_ORDINAL : 
version.ordinal(), doNotCopy);
+    super(allocSize, version, doNotCopy);
   }
 
   /**
@@ -75,7 +75,7 @@ public class HeapDataOutputStream extends
    *        instead referenced.
    */
   public HeapDataOutputStream(ByteBuffer initialBuffer, Version version, 
boolean doNotCopy) {
-    super(initialBuffer, version == null ? Version.CURRENT_ORDINAL : 
version.ordinal(), doNotCopy);
+    super(initialBuffer, version, doNotCopy);
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
 
b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
index a6ec1a9..af742fe 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java
@@ -112,6 +112,7 @@ import org.apache.geode.internal.serialization.DSCODE;
 import org.apache.geode.internal.serialization.DSFIDSerializer;
 import org.apache.geode.internal.serialization.DataSerializableFixedID;
 import org.apache.geode.internal.serialization.DscodeHelper;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.SerializationVersions;
 import org.apache.geode.internal.serialization.VersionedDataStream;
 import org.apache.geode.internal.util.concurrent.CopyOnWriteHashMap;
@@ -351,7 +352,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
     }
     if (out instanceof VersionedDataStream) {
       VersionedDataStream vout = (VersionedDataStream) out;
-      Version version = Version.getVersionForDataStream(vout);
+      Version version = (Version) vout.getVersion();
       if (null != version) {
         if (version.compareTo(Version.GEODE_1_9_0) < 0) {
           if (name.equals(POST_GEODE_190_SERVER_CQIMPL)) {
@@ -2242,7 +2243,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       try {
         ObjectOutput oos = new ObjectOutputStream(stream);
         if (stream instanceof VersionedDataStream) {
-          Version v = Version.getVersionForDataStream((VersionedDataStream) 
stream);
+          Version v = (Version) ((VersionedDataStream) stream).getVersion();
           if (v != null && v != Version.CURRENT) {
             oos = new VersionedObjectOutput(oos, v);
           }
@@ -2280,7 +2281,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
 
       if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
-        Version[] versions = null;
+        SerializationVersion[] versions = null;
         if (ds instanceof SerializationVersions) {
           SerializationVersions sv = (SerializationVersions) ds;
           versions = sv.getSerializationVersions();
@@ -2288,7 +2289,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
         if (versions != null) {
-          for (Version version : versions) {
+          for (SerializationVersion version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(),
@@ -2345,7 +2346,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in);
       if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
-        Version[] versions = null;
+        SerializationVersion[] versions = null;
         if (ds instanceof SerializationVersions) {
           SerializationVersions vds = (SerializationVersions) ds;
           versions = vds.getSerializationVersions();
@@ -2353,7 +2354,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
         if (versions != null) {
-          for (Version version : versions) {
+          for (SerializationVersion version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("fromDataPre" + '_' + 
version.getMethodSuffix(),
@@ -2409,12 +2410,13 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
   }
 
   /**
-   * Get the {@link Version} of the peer or disk store that created this 
{@link DataInput}.
+   * Get the {@link org.apache.geode.internal.Version} of the peer or disk 
store that created this
+   * {@link DataInput}.
    */
   public static Version getVersionForDataStream(DataInput in) {
     // check if this is a versioned data input
     if (in instanceof VersionedDataStream) {
-      final Version v = Version.getVersionForDataStream((VersionedDataStream) 
in);
+      final Version v = (Version) ((VersionedDataStream) in).getVersion();
       return v != null ? v : Version.CURRENT;
     } else {
       // assume latest version
@@ -2423,13 +2425,14 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
   }
 
   /**
-   * Get the {@link Version} of the peer or disk store that created this 
{@link DataInput}. Returns
+   * Get the {@link org.apache.geode.internal.Version} of the peer or disk 
store that created this
+   * {@link DataInput}. Returns
    * null if the version is same as this member's.
    */
   public static Version getVersionForDataStreamOrNull(DataInput in) {
     // check if this is a versioned data input
     if (in instanceof VersionedDataStream) {
-      return Version.getVersionForDataStream((VersionedDataStream) in);
+      return (Version) ((VersionedDataStream) in).getVersion();
     } else {
       // assume latest version
       return null;
@@ -2437,12 +2440,13 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
   }
 
   /**
-   * Get the {@link Version} of the peer or disk store that created this 
{@link DataOutput}.
+   * Get the {@link org.apache.geode.internal.Version} of the peer or disk 
store that created this
+   * {@link DataOutput}.
    */
   public static Version getVersionForDataStream(DataOutput out) {
     // check if this is a versioned data output
     if (out instanceof VersionedDataStream) {
-      final Version v = Version.getVersionForDataStream((VersionedDataStream) 
out);
+      final Version v = (Version) ((VersionedDataStream) out).getVersion();
       return v != null ? v : Version.CURRENT;
     } else {
       // assume latest version
@@ -2451,13 +2455,14 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
   }
 
   /**
-   * Get the {@link Version} of the peer or disk store that created this 
{@link DataOutput}. Returns
+   * Get the {@link org.apache.geode.internal.Version} of the peer or disk 
store that created this
+   * {@link DataOutput}. Returns
    * null if the version is same as this member's.
    */
   public static Version getVersionForDataStreamOrNull(DataOutput out) {
     // check if this is a versioned data output
     if (out instanceof VersionedDataStream) {
-      return Version.getVersionForDataStream((VersionedDataStream) out);
+      return (Version) ((VersionedDataStream) out).getVersion();
     } else {
       // assume latest version
       return null;
@@ -2816,7 +2821,7 @@ public abstract class InternalDataSerializer extends 
DataSerializer {
       ObjectInput ois = new DSObjectInputStream(stream);
       serializationFilter.setFilterOn((ObjectInputStream) ois);
       if (stream instanceof VersionedDataStream) {
-        Version v = Version.getVersionForDataStream((VersionedDataStream) 
stream);
+        Version v = (Version) ((VersionedDataStream) stream).getVersion();
         if (Version.CURRENT != v && v != null) {
           ois = new VersionedObjectInput(ois, v);
         }
diff --git a/geode-core/src/main/java/org/apache/geode/internal/Version.java 
b/geode-core/src/main/java/org/apache/geode/internal/Version.java
index 75f6a59..71243ea 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/Version.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/Version.java
@@ -18,15 +18,13 @@ package org.apache.geode.internal;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
-import java.io.InputStream;
-import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.stream.Collectors;
 
 import org.apache.geode.annotations.Immutable;
 import org.apache.geode.cache.UnsupportedVersionException;
 import org.apache.geode.internal.cache.tier.sockets.CommandInitializer;
-import org.apache.geode.internal.serialization.VersionedDataStream;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 /**
  * Enumerated type for client / server and p2p version.
@@ -38,7 +36,7 @@ import 
org.apache.geode.internal.serialization.VersionedDataStream;
  * @since GemFire 5.7
  */
 @Immutable
-public class Version implements Comparable<Version> {
+public class Version extends SerializationVersion {
 
   /** The name of this version */
   private final transient String name;
@@ -55,20 +53,11 @@ public class Version implements Comparable<Version> {
   private final byte release;
   private final byte patch;
 
-  /** byte used as ordinal to represent this <code>Version</code> */
-  private final short ordinal;
-
   public static final int HIGHEST_VERSION = 107;
 
   @Immutable
   private static final Version[] VALUES = new Version[HIGHEST_VERSION + 1];
 
-  /**
-   * Reserved token that cannot be used for product version but as a flag in 
internal contexts.
-   */
-  private static final byte TOKEN_ORDINAL = -1;
-  private static final int TOKEN_ORDINAL_INT = (TOKEN_ORDINAL & 0xFF);
-
   @Immutable
   public static final Version TOKEN =
       new Version("", "TOKEN", (byte) -1, (byte) 0, (byte) 0, (byte) 0, 
TOKEN_ORDINAL);
@@ -309,13 +298,13 @@ public class Version implements Comparable<Version> {
   /** Creates a new instance of <code>Version</code> */
   private Version(String product, String name, byte major, byte minor, byte 
release, byte patch,
       byte ordinal) {
+    super(ordinal);
     this.productName = product;
     this.name = name;
     this.majorVersion = major;
     this.minorVersion = minor;
     this.release = release;
     this.patch = patch;
-    this.ordinal = ordinal;
     this.methodSuffix = this.productName + "_" + this.majorVersion + "_" + 
this.minorVersion + "_"
         + this.release + "_" + this.patch;
     if (ordinal != TOKEN_ORDINAL) {
@@ -357,42 +346,19 @@ public class Version implements Comparable<Version> {
   }
 
   /**
-   * Write the given ordinal (result of {@link #ordinal()}) to given {@link 
DataOutput}. This keeps
-   * the serialization of ordinal compatible with previous versions writing a 
single byte to
-   * DataOutput when possible, and a token with 2 bytes if it is large.
-   *
-   * @param out the {@link DataOutput} to write the ordinal write to
-   * @param ordinal the version to be written
-   * @param compressed if true, then use single byte for ordinal < 128, and 
three bytes for beyond
-   *        that, else always use three bytes where the first byte is {@link 
#TOKEN_ORDINAL}; former
-   *        mode is useful for interoperatibility with previous versions while 
latter to use fixed
-   *        size for writing version; typically former will be used for 
P2P/client-server
-   *        communications while latter for persisting to disk; we use the 
token to ensure that
-   *        {@link #readOrdinal(DataInput)} can deal with both 
compressed/uncompressed cases
-   *        seemlessly
-   */
-  public static void writeOrdinal(DataOutput out, short ordinal, boolean 
compressed)
-      throws IOException {
-    if (compressed && ordinal <= Byte.MAX_VALUE) {
-      out.writeByte(ordinal);
-    } else {
-      out.writeByte(TOKEN_ORDINAL);
-      out.writeShort(ordinal);
-    }
-  }
-
-  /**
    * Write this {@link Version}'s ordinal (result of {@link #ordinal()}) to 
given
    * {@link DataOutput}. This keeps the serialization of ordinal compatible 
with previous versions
    * writing a single byte to DataOutput when possible, and a token with 2 
bytes if it is large.
    *
    * @param out the {@link DataOutput} to write the ordinal write to
    * @param compressed if true, then use single byte for ordinal < 128, and 
three bytes for beyond
-   *        that, else always use three bytes where the first byte is {@link 
#TOKEN_ORDINAL}; former
+   *        that, else always use three bytes where the first byte is
+   *        {@link SerializationVersion#TOKEN_ORDINAL}; former
    *        mode is useful for interoperatibility with previous versions while 
latter to use fixed
    *        size for writing version; typically former will be used for 
P2P/client-server
    *        communications while latter for persisting to disk; we use the 
token to ensure that
-   *        {@link #readOrdinal(DataInput)} can deal with both 
compressed/uncompressed cases
+   *        {@link SerializationVersion#readOrdinal(DataInput)} can deal with 
both
+   *        compressed/uncompressed cases
    *        seemlessly
    */
   public void writeOrdinal(DataOutput out, boolean compressed) throws 
IOException {
@@ -400,37 +366,6 @@ public class Version implements Comparable<Version> {
   }
 
   /**
-   * Write the given ordinal (result of {@link #ordinal()}) to given {@link 
ByteBuffer}. This keeps
-   * the serialization of ordinal compatible with previous versions writing a 
single byte to
-   * DataOutput when possible, and a token with 2 bytes if it is large.
-   *
-   * @param buffer the {@link ByteBuffer} to write the ordinal write to
-   * @param ordinal the version to be written
-   * @param compressed if true, then use single byte for ordinal < 128, and 
three bytes for beyond
-   *        that, else always use three bytes where the first byte is {@link 
#TOKEN_ORDINAL}
-   */
-  public static void writeOrdinal(ByteBuffer buffer, short ordinal, boolean 
compressed) {
-    if (compressed && ordinal <= Byte.MAX_VALUE) {
-      buffer.put((byte) ordinal);
-    } else {
-      buffer.put(TOKEN_ORDINAL);
-      buffer.putShort(ordinal);
-    }
-  }
-
-  /**
-   * Reads ordinal as written by {@link #writeOrdinal} from given {@link 
DataInput}.
-   */
-  public static short readOrdinal(DataInput in) throws IOException {
-    final byte ordinal = in.readByte();
-    if (ordinal != TOKEN_ORDINAL) {
-      return ordinal;
-    } else {
-      return in.readShort();
-    }
-  }
-
-  /**
    * Return the <code>Version</code> reading from given {@link DataInput} as 
serialized by
    * {@link #writeOrdinal(DataOutput, boolean)}.
    *
@@ -451,6 +386,7 @@ public class Version implements Comparable<Version> {
     return fromOrdinalNoThrow(readOrdinal(in), returnNullForCurrent);
   }
 
+
   /**
    * Return the <code>Version</code> represented by specified ordinal while 
not throwing exception
    * if given ordinal is higher than any known ones or does not map to an 
actual Version instance
@@ -466,30 +402,6 @@ public class Version implements Comparable<Version> {
     return VALUES[ordinal];
   }
 
-  /**
-   * Reads ordinal as written by {@link #writeOrdinal} from given {@link 
InputStream}. Returns -1 on
-   * end of stream.
-   */
-  public static short readOrdinalFromInputStream(InputStream is) throws 
IOException {
-    final int ordinal = is.read();
-    if (ordinal != -1) {
-      if (ordinal != TOKEN_ORDINAL_INT) {
-        return (short) ordinal;
-      } else {
-        // two byte ordinal
-        final int ordinalPart1 = is.read();
-        final int ordinalPart2 = is.read();
-        if ((ordinalPart1 | ordinalPart2) >= 0) {
-          return (short) ((ordinalPart1 << 8) | ordinalPart2);
-        } else {
-          return -1;
-        }
-      }
-    } else {
-      return -1;
-    }
-  }
-
   public String getMethodSuffix() {
     return this.methodSuffix;
   }
@@ -548,21 +460,6 @@ public class Version implements Comparable<Version> {
   }
 
   /**
-   * {@inheritDoc}
-   */
-  @Override
-  public int compareTo(Version other) {
-    if (other != null) {
-      // byte min/max can't overflow int, so use (a-b)
-      final int thisOrdinal = this.ordinal;
-      final int otherOrdinal = other.ordinal;
-      return (thisOrdinal - otherOrdinal);
-    } else {
-      return 1;
-    }
-  }
-
-  /**
    * Returns a string representation for this <code>Version</code>.
    *
    * @return the name of this operation.
@@ -622,15 +519,4 @@ public class Version implements Comparable<Version> {
         .collect(Collectors.toList());
   }
 
-  public static Version getVersionForDataStream(VersionedDataStream input) {
-    try {
-      short ordinal = input.getVersionOrdinal();
-      if (ordinal <= 0) {
-        return null;
-      }
-      return fromOrdinal(ordinal, false);
-    } catch (UnsupportedVersionException e) {
-      return null;
-    }
-  }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java 
b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java
index 73b79d7..8ef654f 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal;
 import java.io.IOException;
 import java.io.ObjectInput;
 
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataStream;
 
 /**
@@ -46,11 +47,8 @@ public class VersionedObjectInput implements ObjectInput, 
VersionedDataStream {
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
-    if (version == null) {
-      return Version.CURRENT_ORDINAL;
-    }
-    return this.version.ordinal();
+  public SerializationVersion getVersion() {
+    return this.version;
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java 
b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java
index e8d44ff..8d025d8 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal;
 import java.io.IOException;
 import java.io.ObjectOutput;
 
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataStream;
 
 /**
@@ -49,11 +50,8 @@ public class VersionedObjectOutput implements ObjectOutput, 
VersionedDataStream
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
-    if (version == null) {
-      return Version.CURRENT_ORDINAL;
-    }
-    return this.version.ordinal();
+  public SerializationVersion getVersion() {
+    return this.version;
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
index 7cf2d51..67affe0 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java
@@ -81,6 +81,7 @@ import 
org.apache.geode.internal.cache.versions.RegionVersionVector;
 import org.apache.geode.internal.concurrent.ConcurrentHashSet;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 /**
  * Does all the IF file work for a DiskStoreImpl.
@@ -2794,7 +2795,7 @@ public class DiskInitFile implements 
DiskInitFileInterpreter {
     try {
       ByteBuffer bb = getIFWriteBuffer(1 + 3 + 1);
       bb.put(IFREC_GEMFIRE_VERSION);
-      Version.writeOrdinal(bb, version.ordinal(), false);
+      SerializationVersion.writeOrdinal(bb, version.ordinal(), false);
       bb.put(END_OF_RECORD_ID);
       writeIFRecord(bb, false); // don't do stats for these small records
     } catch (IOException ex) {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
index 5d26919..5e08365 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java
@@ -315,7 +315,7 @@ public class EventID implements DataSerializableFixedID, 
Serializable, Externali
       disVersion = Version.GFE_90;
     }
     ByteArrayDataInput dis =
-        new ByteArrayDataInput(membershipID, disVersion == null ? 0 : 
disVersion.ordinal());
+        new ByteArrayDataInput(membershipID, disVersion);
     InternalDistributedMember result = null;
     try {
       result = InternalDistributedMember.readEssentialData(dis);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
index 9a33a06..f227aba 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java
@@ -486,7 +486,7 @@ public class FilterRoutingInfo implements 
VersionedDataSerializable {
     private void deserialize() {
       try {
         ByteArrayDataInput dis =
-            new ByteArrayDataInput(myData, myDataVersion == null ? 0 : 
myDataVersion.ordinal());
+            new ByteArrayDataInput(myData, myDataVersion);
         boolean hasCQs = dis.readBoolean();
         if (hasCQs) {
           int numEntries = InternalDataSerializer.readArrayLength(dis);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java 
b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
index 0675ecc..035f61d 100755
--- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java
@@ -108,6 +108,7 @@ import 
org.apache.geode.internal.offheap.annotations.Released;
 import org.apache.geode.internal.offheap.annotations.Retained;
 import org.apache.geode.internal.sequencelog.EntryLogger;
 import org.apache.geode.internal.serialization.ByteArrayDataInput;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.shared.NativeCalls;
 import org.apache.geode.internal.util.BlobHelper;
 import org.apache.geode.pdx.internal.PdxWriterImpl;
@@ -2071,7 +2072,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
 
   private Version readProductVersionRecord(DataInput dis, File f) throws 
IOException {
     Version recoveredGFVersion;
-    short ver = Version.readOrdinal(dis);
+    short ver = SerializationVersion.readOrdinal(dis);
     try {
       recoveredGFVersion = Version.fromOrdinal(ver, false);
     } catch (UnsupportedVersionException e) {
@@ -6362,7 +6363,8 @@ public class Oplog implements CompactableOplog, Flushable 
{
   }
 
   /**
-   * If this OpLog is from an older version of the product, then return that 
{@link Version} else
+   * If this OpLog is from an older version of the product, then return that
+   * {@link org.apache.geode.internal.Version} else
    * return null.
    */
   public Version getProductVersionIfOld() {
@@ -6385,7 +6387,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
 
   /**
    * If this OpLog has data that was written by an older version of the 
product, then return that
-   * {@link Version} else return null.
+   * {@link org.apache.geode.internal.Version} else return null.
    */
   public Version getDataVersionIfOld() {
     final Version version = this.dataVersion;
@@ -6574,7 +6576,7 @@ public class Oplog implements CompactableOplog, Flushable 
{
         flushNoSync(olf);
       }
       // don't compress since we setup fixed size of buffers
-      Version.writeOrdinal(bb, ordinal, false);
+      SerializationVersion.writeOrdinal(bb, ordinal, false);
     }
 
     private void writeInt(OplogFile olf, int v) throws IOException {
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java
index 5350a90..d595a92 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java
@@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.ProxyBucketRegion;
 import org.apache.geode.internal.cache.versions.RegionVersionHolder;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.logging.log4j.LogMarker;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 public class DiskInitFileParser {
   private static final Logger logger = LogService.getLogger();
@@ -426,7 +427,7 @@ public class DiskInitFileParser {
         }
           break;
         case DiskInitFile.IFREC_GEMFIRE_VERSION: {
-          short ver = Version.readOrdinal(dis);
+          short ver = SerializationVersion.readOrdinal(dis);
           readEndOfRecord(dis);
           if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) {
             logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, 
"IFREC_GEMFIRE_VERSION version={}",
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
index 2a6e073..637f2c6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java
@@ -29,6 +29,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.logging.LogService;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 
@@ -58,9 +59,9 @@ class ClientRegistrationMetadata {
         unversionedDataOutputStream)) {
       if (oldClientRequiresVersionedStreams(clientVersion)) {
         dataInputStream =
-            new VersionedDataInputStream(unversionedDataInputStream, 
clientVersion.ordinal());
+            new VersionedDataInputStream(unversionedDataInputStream, 
clientVersion);
         dataOutputStream =
-            new VersionedDataOutputStream(unversionedDataOutputStream, 
clientVersion.ordinal());
+            new VersionedDataOutputStream(unversionedDataOutputStream, 
clientVersion);
       } else {
         dataInputStream = unversionedDataInputStream;
         dataOutputStream = unversionedDataOutputStream;
@@ -113,7 +114,7 @@ class ClientRegistrationMetadata {
   private boolean getAndValidateClientVersion(final Socket socket,
       final DataInputStream dataInputStream, final DataOutputStream 
dataOutputStream)
       throws IOException {
-    short clientVersionOrdinal = Version.readOrdinal(dataInputStream);
+    short clientVersionOrdinal = 
SerializationVersion.readOrdinal(dataInputStream);
 
     try {
       clientVersion = Version.fromOrdinal(clientVersionOrdinal, true);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java
index 387c741..5f793e2 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java
@@ -32,6 +32,7 @@ import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.logging.LogService;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.internal.serialization.SerializationVersion;
 
 class ServerSideHandshakeFactory {
   private static final Logger logger = LogService.getLogger();
@@ -64,7 +65,7 @@ class ServerSideHandshakeFactory {
       soTimeout = socket.getSoTimeout();
       socket.setSoTimeout(timeout);
       InputStream is = socket.getInputStream();
-      short clientVersionOrdinal = Version.readOrdinalFromInputStream(is);
+      short clientVersionOrdinal = 
SerializationVersion.readOrdinalFromInputStream(is);
       if (clientVersionOrdinal == -1) {
         throw new EOFException(
             "HandShakeReader: EOF reached before client version could be 
read");
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
index c4a06b9..624cde3 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java
@@ -35,6 +35,7 @@ import org.apache.geode.internal.cache.tier.CommunicationMode;
 import org.apache.geode.internal.cache.tier.Encryptor;
 import org.apache.geode.internal.cache.tier.ServerSideHandshake;
 import org.apache.geode.internal.security.SecurityService;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataInputStream;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 import org.apache.geode.internal.serialization.VersionedDataStream;
@@ -87,9 +88,9 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
         this.clientReadTimeout = dataInputStream.readInt();
         if (clientVersion.compareTo(Version.CURRENT) < 0) {
           // versioned streams allow object serialization code to deal with 
older clients
-          dataInputStream = new VersionedDataInputStream(dataInputStream, 
clientVersion.ordinal());
+          dataInputStream = new VersionedDataInputStream(dataInputStream, 
clientVersion);
           dataOutputStream =
-              new VersionedDataOutputStream(dataOutputStream, 
clientVersion.ordinal());
+              new VersionedDataOutputStream(dataOutputStream, clientVersion);
         }
         this.id = ClientProxyMembershipID.readCanonicalized(dataInputStream);
         // Note: credentials should always be the last piece in handshake for
@@ -134,8 +135,8 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
     DataOutputStream dos = new DataOutputStream(out);
     DataInputStream dis;
     if (clientVersion.compareTo(Version.CURRENT) < 0) {
-      dis = new VersionedDataInputStream(in, clientVersion.ordinal());
-      dos = new VersionedDataOutputStream(dos, clientVersion.ordinal());
+      dis = new VersionedDataInputStream(in, clientVersion);
+      dos = new VersionedDataOutputStream(dos, clientVersion);
     } else {
       dis = new DataInputStream(in);
     }
@@ -149,7 +150,7 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
 
     // additional byte of wan site needs to send for Gateway BC
     if (communicationMode.isWAN()) {
-      Version.writeOrdinal(dos, currentServerVersion.ordinal(), true);
+      SerializationVersion.writeOrdinal(dos, currentServerVersion.ordinal(), 
true);
     }
 
     dos.writeByte(endpointType);
@@ -160,7 +161,7 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
 
     Version v = Version.CURRENT;
     if (dos instanceof VersionedDataStream) {
-      v = Version.getVersionForDataStream((VersionedDataStream) dos);
+      v = (Version) ((VersionedDataStream) dos).getVersion();
     }
     HeapDataOutputStream hdos = new HeapDataOutputStream(v);
     DataSerializer.writeObject(member, hdos);
@@ -171,7 +172,8 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
     dos.writeUTF("");
 
     // Write delta-propagation property value if this is not WAN.
-    if (!communicationMode.isWAN() && 
this.clientVersion.compareTo(Version.GFE_61) >= 0) {
+    if (!communicationMode.isWAN() && this.clientVersion.compareTo(
+        Version.GFE_61) >= 0) {
       dos.writeBoolean(((InternalDistributedSystem) 
this.system).getConfig().getDeltaPropagation());
     }
 
@@ -189,7 +191,8 @@ public class ServerSideHandshakeImpl extends Handshake 
implements ServerSideHand
           .getDistributedSystemId());
     }
 
-    if ((communicationMode.isWAN()) && 
this.clientVersion.compareTo(Version.GFE_80) >= 0
+    if ((communicationMode.isWAN()) && this.clientVersion.compareTo(
+        Version.GFE_80) >= 0
         && currentServerVersion.compareTo(Version.GFE_80) >= 0) {
       int pdxSize = PeerTypeRegistration.getPdxRegistrySize();
       dos.writeInt(pdxSize);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
index d32fd50..a9abf34 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java
@@ -735,7 +735,7 @@ public class GatewaySenderEventImpl
     // this._id = in.readUTF();
     if (version < 0x11 && (in instanceof InputStream)
         && InternalDataSerializer.getVersionForDataStream(in) == 
Version.CURRENT) {
-      in = new VersionedDataInputStream((InputStream) in, 
Version.GFE_701.ordinal());
+      in = new VersionedDataInputStream((InputStream) in, Version.GFE_701);
     }
     this.id = (EventID) DataSerializer.readObject(in);
     // TODO:Asif ; Check if this violates Barry's logic of not assiging VM
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java
index 20f336b..245b286 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java
@@ -43,7 +43,7 @@ public class BufferDataOutputStream extends OutputStream 
implements VersionedDat
   protected LinkedList<ByteBuffer> chunks = null;
   protected int size = 0;
   protected boolean ignoreWrites = false; // added for bug 39569
-  protected short version;
+  protected SerializationVersion version;
   protected boolean doNotCopy;
   protected ByteBuffer buffer;
   /**
@@ -55,7 +55,7 @@ public class BufferDataOutputStream extends OutputStream 
implements VersionedDat
   private Error expansionException = null;
   private int memoPosition;
 
-  public BufferDataOutputStream(int allocSize, short version) {
+  public BufferDataOutputStream(int allocSize, SerializationVersion version) {
     this(allocSize, version, false);
   }
 
@@ -80,7 +80,7 @@ public class BufferDataOutputStream extends OutputStream 
implements VersionedDat
    * @param doNotCopy if true then byte arrays/buffers/sources will not be 
copied to this hdos but
    *        instead referenced.
    */
-  public BufferDataOutputStream(int allocSize, short version, boolean 
doNotCopy) {
+  public BufferDataOutputStream(int allocSize, SerializationVersion version, 
boolean doNotCopy) {
     if (allocSize < SMALLEST_CHUNK_SIZE) {
       this.MIN_CHUNK_SIZE = SMALLEST_CHUNK_SIZE;
     } else {
@@ -91,7 +91,8 @@ public class BufferDataOutputStream extends OutputStream 
implements VersionedDat
     this.doNotCopy = doNotCopy;
   }
 
-  public BufferDataOutputStream(ByteBuffer initialBuffer, short version, 
boolean doNotCopy) {
+  public BufferDataOutputStream(ByteBuffer initialBuffer, SerializationVersion 
version,
+      boolean doNotCopy) {
     if (initialBuffer.position() != 0) {
       initialBuffer = initialBuffer.slice();
     }
@@ -151,7 +152,7 @@ public class BufferDataOutputStream extends OutputStream 
implements VersionedDat
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
+  public SerializationVersion getVersion() {
     return version;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java
index 20d5f2f..23fff7a 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java
@@ -35,7 +35,7 @@ public class ByteArrayDataInput extends InputStream 
implements DataInput, Versio
   private int pos;
   /** reusable buffer for readUTF */
   private char[] charBuf;
-  private short version;
+  private SerializationVersion version;
 
   /**
    * Create a {@link DataInput} whose contents are empty.
@@ -43,10 +43,10 @@ public class ByteArrayDataInput extends InputStream 
implements DataInput, Versio
   public ByteArrayDataInput() {}
 
   public ByteArrayDataInput(byte[] bytes) {
-    initialize(bytes, (short) 0);
+    initialize(bytes, null);
   }
 
-  public ByteArrayDataInput(byte[] bytes, short version) {
+  public ByteArrayDataInput(byte[] bytes, SerializationVersion version) {
     initialize(bytes, version);
   }
 
@@ -57,18 +57,18 @@ public class ByteArrayDataInput extends InputStream 
implements DataInput, Versio
    *        (a copy is not made) so it should not be changed externally.
    * @param version the product version that serialized the object on given 
bytes
    */
-  public void initialize(byte[] bytes, int version) {
+  public void initialize(byte[] bytes, SerializationVersion version) {
     this.bytes = bytes;
     this.nBytes = bytes.length;
     this.pos = 0;
-    this.version = (short) version;
+    this.version = version;
   }
 
   /**
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
+  public SerializationVersion getVersion() {
     return version;
   }
 
@@ -463,7 +463,7 @@ public class ByteArrayDataInput extends InputStream 
implements DataInput, Versio
     this.bytes = null;
     this.nBytes = 0;
     this.pos = 0;
-    this.version = 0;
+    this.version = null;
   }
 
   /**
@@ -471,7 +471,8 @@ public class ByteArrayDataInput extends InputStream 
implements DataInput, Versio
    */
   @Override
   public String toString() {
-    return this.version == 0 ? super.toString() : (super.toString() + " (v" + 
this.version + ')');
+    return this.version == null ? super.toString()
+        : (super.toString() + " (v" + this.version + ')');
   }
 
   private void throwUTFEncodingError(int index, int char1, int char2, Integer 
char3, int enc)
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java
index 37cd5a7..d538a02 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java
@@ -129,11 +129,11 @@ public class DSFIDSerializer {
       if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
         SerializationVersions sv = (SerializationVersions) ds;
-        Version[] versions = sv.getSerializationVersions();
+        SerializationVersion[] versions = sv.getSerializationVersions();
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
         if (versions != null) {
-          for (Version version : versions) {
+          for (SerializationVersion version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(),
@@ -181,13 +181,12 @@ public class DSFIDSerializer {
    * Returns
    * zero if the version is same as this member's.
    */
-  public short getVersionForDataStreamOrZero(DataOutput out) {
+  public SerializationVersion getVersionForDataStreamOrNull(DataOutput out) {
     // check if this is a versioned data output
     if (out instanceof VersionedDataStream) {
-      return ((VersionedDataStream) out).getVersionOrdinal();
+      return ((VersionedDataStream) out).getVersion();
     } else {
-      // assume latest version
-      return 0;
+      return null;
     }
   }
 
@@ -262,13 +261,13 @@ public class DSFIDSerializer {
       Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in);
       if (Version.CURRENT != v && v != null) {
         // get versions where DataOutput was upgraded
-        Version[] versions = null;
+        SerializationVersion[] versions = null;
         SerializationVersions vds = (SerializationVersions) ds;
         versions = vds.getSerializationVersions();
         // check if the version of the peer or diskstore is different and
         // there has been a change in the message
         if (versions != null) {
-          for (Version version : versions) {
+          for (SerializationVersion version : versions) {
             // if peer version is less than the greatest upgraded version
             if (v.compareTo(version) < 0) {
               ds.getClass().getMethod("fromDataPre" + '_' + 
version.getMethodSuffix(),
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java
new file mode 100644
index 0000000..4f0cbda
--- /dev/null
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.apache.geode.internal.serialization;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class SerializationVersion implements Comparable<SerializationVersion> {
+
+  /**
+   * Reserved token that cannot be used for product version but as a flag in 
internal contexts.
+   */
+  protected static final byte TOKEN_ORDINAL = -1;
+  protected static final int TOKEN_ORDINAL_INT = (TOKEN_ORDINAL & 0xFF);
+
+  /** value used as ordinal to represent this 
<code>SerializationVersion</code> */
+  protected final short ordinal;
+
+  public SerializationVersion(int ordinal) {
+    this.ordinal = (short) ordinal;
+  }
+
+  /**
+   * Write the given ordinal (result of ordinal()) to given {@link 
DataOutput}. This keeps
+   * the serialization of ordinal compatible with previous versions writing a 
single byte to
+   * DataOutput when possible, and a token with 2 bytes if it is large.
+   *
+   * @param out the {@link DataOutput} to write the ordinal write to
+   * @param ordinal the version to be written
+   * @param compressed if true, then use single byte for ordinal < 128, and 
three bytes for beyond
+   *        that, else always use three bytes where the first byte is {@link 
#TOKEN_ORDINAL}; former
+   *        mode is useful for interoperatibility with previous versions while 
latter to use fixed
+   *        size for writing version; typically former will be used for 
P2P/client-server
+   *        communications while latter for persisting to disk; we use the 
token to ensure that
+   *        {@link #readOrdinal(DataInput)} can deal with both 
compressed/uncompressed cases
+   *        seemlessly
+   */
+  public static void writeOrdinal(DataOutput out, short ordinal, boolean 
compressed)
+      throws IOException {
+    if (compressed && ordinal <= Byte.MAX_VALUE) {
+      out.writeByte(ordinal);
+    } else {
+      out.writeByte(TOKEN_ORDINAL);
+      out.writeShort(ordinal);
+    }
+  }
+
+  /**
+   * Write the given ordinal (result of ordinal()) to given {@link 
ByteBuffer}. This keeps
+   * the serialization of ordinal compatible with previous versions writing a 
single byte to
+   * DataOutput when possible, and a token with 2 bytes if it is large.
+   *
+   * @param buffer the {@link ByteBuffer} to write the ordinal write to
+   * @param ordinal the version to be written
+   * @param compressed if true, then use single byte for ordinal < 128, and 
three bytes for beyond
+   *        that, else always use three bytes where the first byte is {@link 
#TOKEN_ORDINAL}
+   */
+  public static void writeOrdinal(ByteBuffer buffer, short ordinal, boolean 
compressed) {
+    if (compressed && ordinal <= Byte.MAX_VALUE) {
+      buffer.put((byte) ordinal);
+    } else {
+      buffer.put(TOKEN_ORDINAL);
+      buffer.putShort(ordinal);
+    }
+  }
+
+  /**
+   * Reads ordinal as written by {@link #writeOrdinal} from given {@link 
DataInput}.
+   */
+  public static short readOrdinal(DataInput in) throws IOException {
+    final byte ordinal = in.readByte();
+    if (ordinal != TOKEN_ORDINAL) {
+      return ordinal;
+    } else {
+      return in.readShort();
+    }
+  }
+
+  /**
+   * Reads ordinal as written by writeOrdinal from given InputStream. Returns 
-1 on
+   * end of stream.
+   */
+  public static short readOrdinalFromInputStream(InputStream is) throws 
IOException {
+    final int ordinal = is.read();
+    if (ordinal != -1) {
+      if (ordinal != TOKEN_ORDINAL_INT) {
+        return (short) ordinal;
+      } else {
+        // two byte ordinal
+        final int ordinalPart1 = is.read();
+        final int ordinalPart2 = is.read();
+        if ((ordinalPart1 | ordinalPart2) >= 0) {
+          return (short) ((ordinalPart1 << 8) | ordinalPart2);
+        } else {
+          return -1;
+        }
+      }
+    } else {
+      return -1;
+    }
+  }
+
+  public String getMethodSuffix() {
+    return "" + ordinal;
+  }
+
+  @Override
+  public String toString() {
+    return "Version{" +
+        "ordinal=" + ordinal +
+        '}';
+  }
+
+  @Override
+  public int compareTo(SerializationVersion other) {
+    if (other != null) {
+      // byte min/max can't overflow int, so use (a-b)
+      final int thisOrdinal = this.ordinal;
+      final int otherOrdinal = other.ordinal;
+      return (thisOrdinal - otherOrdinal);
+    } else {
+      return 1;
+    }
+  }
+}
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java
index 1d9d2d3..5eeaf02 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java
@@ -14,8 +14,6 @@
  */
 package org.apache.geode.internal.serialization;
 
-import org.apache.geode.internal.Version;
-
 /**
  * This interface is extended by DataSerializableFixedID and 
VersionedDataSerializable in order to
  * furnish version information to the serialization infrastructure for 
backward compatibility
@@ -34,6 +32,6 @@ public interface SerializationVersions {
    * The method name is formed with the version's product name and its major, 
minor, release and
    * patch numbers.
    */
-  Version[] getSerializationVersions();
+  SerializationVersion[] getSerializationVersions();
 
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java
index ff0fc2f..b9f0b65 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java
@@ -28,7 +28,7 @@ import java.io.InputStream;
  */
 public class VersionedDataInputStream extends DataInputStream implements 
VersionedDataStream {
 
-  private final short version;
+  private final SerializationVersion version;
 
   /**
    * Creates a VersionedDataInputStream that uses the specified underlying 
InputStream.
@@ -36,7 +36,7 @@ public class VersionedDataInputStream extends DataInputStream 
implements Version
    * @param in the specified input stream
    * @param version the product version that serialized object on the given 
input stream
    */
-  public VersionedDataInputStream(InputStream in, short version) {
+  public VersionedDataInputStream(InputStream in, SerializationVersion 
version) {
     super(in);
     this.version = version;
   }
@@ -45,7 +45,7 @@ public class VersionedDataInputStream extends DataInputStream 
implements Version
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
+  public SerializationVersion getVersion() {
     return this.version;
   }
 
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java
index ddbba6e..9e582f6 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java
@@ -26,7 +26,7 @@ import java.io.OutputStream;
  */
 public class VersionedDataOutputStream extends DataOutputStream implements 
VersionedDataStream {
 
-  private final short version;
+  private SerializationVersion version;
 
   /**
    * Creates a VersionedDataOutputStream that wraps the specified underlying 
OutputStream.
@@ -34,7 +34,7 @@ public class VersionedDataOutputStream extends 
DataOutputStream implements Versi
    * @param out the underlying output stream
    * @param version the product version that serialized object on the given 
{@link OutputStream}
    */
-  public VersionedDataOutputStream(OutputStream out, short version) {
+  public VersionedDataOutputStream(OutputStream out, SerializationVersion 
version) {
     super(out);
     this.version = version;
   }
@@ -43,7 +43,7 @@ public class VersionedDataOutputStream extends 
DataOutputStream implements Versi
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
+  public SerializationVersion getVersion() {
     return this.version;
   }
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java
 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java
index d7f7b7a..93dbca9 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java
@@ -37,10 +37,10 @@ public interface VersionedDataStream {
 
   /**
    * If the remote peer to which this input/output is connected has a lower 
version that this
-   * member, then this returns the {@link Version} ordinal of the peer else 0. 
If the peer has a
+   * member, then this returns the {@link Version} of the peer else null. If 
the peer has a
    * higher
    * {@link Version}, then this member cannot do any adjustment to 
serialization and its the remote
    * peer's responsibility to adjust the serialization/deserialization 
according to this peer.
    */
-  short getVersionOrdinal();
+  SerializationVersion getVersion();
 }
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
index 60113e8..ef3c430 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java
@@ -80,6 +80,7 @@ import org.apache.geode.internal.net.BufferPool;
 import org.apache.geode.internal.net.NioFilter;
 import org.apache.geode.internal.net.NioPlainEngine;
 import org.apache.geode.internal.net.SocketCreator;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.tcp.MsgReader.Header;
 import org.apache.geode.internal.util.concurrent.ReentrantSemaphore;
 
@@ -651,7 +652,8 @@ public class Connection implements Runnable {
       bb.putInt(cfg.getAsyncQueueTimeout());
       bb.putInt(cfg.getAsyncMaxQueueSize());
       // write own product version
-      Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true);
+      SerializationVersion
+          .writeOrdinal(bb, Version.CURRENT.ordinal(), true);
       // now set the msg length into position 0
       bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES));
       my_okHandshakeBuf = bb;
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
index 814ce1f..8de1b3f 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java
@@ -240,7 +240,7 @@ public class MsgDestreamer {
           final Version v = version;
           DataInputStream dis =
               v == null ? new DataInputStream(this.is)
-                  : new VersionedDataInputStream(this.is, v.ordinal());
+                  : new VersionedDataInputStream(this.is, v);
           long startSer = this.stats.startMsgDeserialization();
           setResult((DistributionMessage) 
InternalDataSerializer.readDSFID(dis));
           this.stats.endMsgDeserialization(startSer);
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java
 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java
index 0945afe..d77686b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java
@@ -18,6 +18,7 @@ package org.apache.geode.internal.tcp;
 import java.nio.ByteBuffer;
 
 import org.apache.geode.internal.Version;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataStream;
 
 /**
@@ -57,11 +58,8 @@ public class VersionedByteBufferInputStream extends 
ByteBufferInputStream
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
-    if (version == null) {
-      return Version.CURRENT_ORDINAL;
-    }
-    return this.version.ordinal();
+  public SerializationVersion getVersion() {
+    return this.version;
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
index 9eed78c..020c90b 100644
--- 
a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
+++ 
b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java
@@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats;
 import org.apache.geode.distributed.internal.DistributionMessage;
 import org.apache.geode.internal.Version;
 import org.apache.geode.internal.net.BufferPool;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataStream;
 
 /**
@@ -42,11 +43,8 @@ class VersionedMsgStreamer extends MsgStreamer implements 
VersionedDataStream {
    * {@inheritDoc}
    */
   @Override
-  public short getVersionOrdinal() {
-    if (version == null) {
-      return Version.CURRENT_ORDINAL;
-    }
-    return this.version.ordinal();
+  public SerializationVersion getVersion() {
+    return this.version;
   }
 
   /**
diff --git 
a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java 
b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
index f045151..82b2ee7 100644
--- a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
+++ b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java
@@ -95,7 +95,7 @@ public class BlobHelper {
       if (in == null) {
         in = new ByteArrayDataInput();
       }
-      in.initialize(blob, version == null ? 0 : version.ordinal());
+      in.initialize(blob, version);
       result = DataSerializer.readObject(in);
     }
     endDeserialization(start, blob.length);
diff --git a/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java 
b/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java
index 823ee30..5ae74a7 100644
--- a/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java
+++ b/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java
@@ -57,9 +57,9 @@ public class DataSerializerTest {
   public void readStringShouldReturnCanonicalEmptyString() throws IOException {
     byte[] serializedEmptyStringBytes = BlobHelper.serializeToBlob("");
     ByteArrayDataInput dataInput1 = new ByteArrayDataInput();
-    dataInput1.initialize(serializedEmptyStringBytes, 0);
+    dataInput1.initialize(serializedEmptyStringBytes, null);
     ByteArrayDataInput dataInput2 = new ByteArrayDataInput();
-    dataInput2.initialize(serializedEmptyStringBytes, 0);
+    dataInput2.initialize(serializedEmptyStringBytes, null);
 
     String firstRead = DataSerializer.readString(dataInput1);
     String secondRead = DataSerializer.readString(dataInput2);
diff --git 
a/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java
index 734c9e9..cbc2ca5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java
@@ -77,7 +77,7 @@ public class FunctionAdapterJUnitTest {
                 + 
"serializedFunctionAdapterWithDifferentSerialVersionUID.ser").getAbsolutePath());
 
     DataInputStream dis =
-        new VersionedDataInputStream(new DataInputStream(fis), 
Version.GFE_82.ordinal());
+        new VersionedDataInputStream(new DataInputStream(fis), Version.GFE_82);
     Object o = InternalDataSerializer.basicReadObject(dis);
     assertTrue(o instanceof FunctionAdapter);
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java
index 29f7dac..7c41ee5 100644
--- 
a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java
@@ -202,7 +202,7 @@ public class GMSMemberJUnitTest {
     dataOutput = new HeapDataOutputStream(Version.GFE_90);
     member.writeEssentialData(dataOutput);
     bais = new ByteArrayInputStream(baos.toByteArray());
-    dataInput = new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_90.ordinal());
+    dataInput = new VersionedDataInputStream(new DataInputStream(bais), 
Version.GFE_90);
     newMember = new GMSMember();
     newMember.readEssentialData(dataInput);
     assertEquals(0, newMember.getVmKind());
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
index 015f949..aa71e0c 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java
@@ -239,7 +239,7 @@ public class ByteArrayDataInputTest {
       return new java.io.DataInputStream(new 
java.io.ByteArrayInputStream(inputBytes));
     } else {
       ByteArrayDataInput input = new ByteArrayDataInput();
-      input.initialize(inputBytes, 0);
+      input.initialize(inputBytes, null);
       return input;
     }
   }
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java
index bb53fe4..d731d97 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java
@@ -1165,13 +1165,13 @@ public class DataSerializableJUnitTest implements 
Serializable {
     VersionedDataSerializableImpl ds = new 
VersionedDataSerializableImpl(getRandom());
 
     VersionedDataOutputStream v =
-        new VersionedDataOutputStream(this.baos, Version.GFE_70.ordinal());
+        new VersionedDataOutputStream(this.baos, Version.GFE_70);
     DataSerializer.writeObject(ds, v);
     v.flush();
 
     ByteBuffer bb = ByteBuffer.wrap(this.baos.toByteArray());
     ByteBufferInputStream bbis = new ByteBufferInputStream(bb);
-    VersionedDataInputStream vin = new VersionedDataInputStream(bbis, 
Version.GFE_70.ordinal());
+    VersionedDataInputStream vin = new VersionedDataInputStream(bbis, 
Version.GFE_70);
     VersionedDataSerializableImpl ds2 =
         (VersionedDataSerializableImpl) DataSerializer.readObject(vin);
 
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
index 014f94d..067fee2 100644
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java
@@ -52,7 +52,7 @@ public class FilterInfoTest {
     byte[] outputBytes = dataOut.toByteArray();
     FilterInfo deserialized = new FilterInfo();
     ByteArrayDataInput dataInput = new ByteArrayDataInput();
-    dataInput.initialize(outputBytes, Version.CURRENT_ORDINAL);
+    dataInput.initialize(outputBytes, Version.CURRENT);
     deserialized.fromData(dataInput);
     assertThat(deserialized.getCQs()).isEqualTo(cqs);
     assertThat(deserialized.getInterestedClients()).isEqualTo(clients);
diff --git 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java
 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java
index d185e9d..a5ea15a 100755
--- 
a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java
+++ 
b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java
@@ -180,7 +180,7 @@ public class EventIdOptimizationJUnitTest {
 
 
     HeapDataOutputStream hdos90 = new HeapDataOutputStream(256, 
Version.GFE_90);
-    VersionedDataOutputStream dop = new VersionedDataOutputStream(hdos90, 
Version.GFE_90.ordinal());
+    VersionedDataOutputStream dop = new VersionedDataOutputStream(hdos90, 
Version.GFE_90);
 
     eventID.toData(dop);
 
@@ -188,7 +188,7 @@ public class EventIdOptimizationJUnitTest {
 
 
     VersionedDataInputStream dataInputStream =
-        new VersionedDataInputStream(bais, Version.GFE_90.ordinal());
+        new VersionedDataInputStream(bais, Version.GFE_90);
 
     EventID eventID2 = new EventID();
     eventID2.fromData(dataInputStream);
diff --git 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java
 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java
index 88f9e4e..2b84108 100644
--- 
a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java
+++ 
b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java
@@ -84,7 +84,7 @@ public class PageEntryJUnitTest {
     entry.toData(out);
     final byte[] bytes = out.toByteArray();
     ByteArrayDataInput in = new ByteArrayDataInput();
-    in.initialize(bytes, (short) 0);
+    in.initialize(bytes, null);
     PageEntry newEntry = new PageEntry();
     newEntry.fromData(in);
     return newEntry;
diff --git 
a/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java
 
b/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java
index 4d21631..d2e9ff1 100644
--- 
a/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java
+++ 
b/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java
@@ -109,7 +109,7 @@ public class OldClientSupportDUnitTest extends 
JUnit4CacheTestCase {
 
     Version oldClientVersion = Version.GFE_82;
     VersionedDataOutputStream dout = new VersionedDataOutputStream(
-        new HeapDataOutputStream(10, oldClientVersion), 
oldClientVersion.ordinal());
+        new HeapDataOutputStream(10, oldClientVersion), oldClientVersion);
 
     for (String geodeClassName : newArrayClassNames) {
       String newName = 
oldClientSupport.processOutgoingClassName(geodeClassName, dout);
@@ -122,7 +122,7 @@ public class OldClientSupportDUnitTest extends 
JUnit4CacheTestCase {
     }
 
     VersionedDataInputStream din = new VersionedDataInputStream(
-        new DataInputStream(new ByteArrayInputStream(new byte[10])), 
oldClientVersion.ordinal());
+        new DataInputStream(new ByteArrayInputStream(new byte[10])), 
oldClientVersion);
 
     for (String oldClassName : oldArrayClassNames) {
       String newName = oldClientSupport.processIncomingClassName(oldClassName, 
din);
@@ -198,7 +198,7 @@ public class OldClientSupportDUnitTest extends 
JUnit4CacheTestCase {
     byte[] serializedForm = byteStream.toByteArray();
 
     ByteArrayDataInput byteDataInput = new ByteArrayDataInput();
-    byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal());
+    byteDataInput.initialize(serializedForm, Version.GFE_82);
     ClientSerializableObject result = DataSerializer.readObject(byteDataInput);
     Assert.assertEquals("Expected an org.apache.geode exception but found " + 
result,
         result.getClass().getName().substring(0, "org.apache.geode".length()), 
"org.apache.geode");
@@ -225,7 +225,7 @@ public class OldClientSupportDUnitTest extends 
JUnit4CacheTestCase {
     byte[] serializedForm = byteStream.toByteArray();
 
     ByteArrayDataInput byteDataInput = new ByteArrayDataInput();
-    byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal());
+    byteDataInput.initialize(serializedForm, Version.GFE_82);
     Object result = DataSerializer.readObject(byteDataInput);
     Assert.assertEquals("Expected an org.apache.geode object but found " + 
result,
         result.getClass().getName().substring(0, "org.apache.geode".length()), 
"org.apache.geode");
@@ -249,7 +249,7 @@ public class OldClientSupportDUnitTest extends 
JUnit4CacheTestCase {
     byte[] serializedForm = byteStream.toByteArray();
 
     ByteArrayDataInput byteDataInput = new ByteArrayDataInput();
-    byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal());
+    byteDataInput.initialize(serializedForm, Version.GFE_82);
     Object result = DataSerializer.readObject(byteDataInput);
     Assert.assertEquals("Expected an org.apache.geode object but found " + 
result,
         result.getClass().getName().substring(0, "org.apache.geode".length()), 
"org.apache.geode");
diff --git 
a/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java
 
b/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java
index d6878e4..a0695ce 100644
--- 
a/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java
+++ 
b/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java
@@ -26,6 +26,7 @@ import org.apache.geode.internal.Version;
 import org.apache.geode.internal.cache.CacheService;
 import org.apache.geode.internal.cache.InternalCache;
 import org.apache.geode.internal.cache.tier.sockets.OldClientSupportService;
+import org.apache.geode.internal.serialization.SerializationVersion;
 import org.apache.geode.internal.serialization.VersionedDataOutputStream;
 import org.apache.geode.management.internal.beans.CacheServiceMBeanBase;
 
@@ -106,7 +107,7 @@ public class OldClientSupportProvider implements 
OldClientSupportService {
     // if the client is old then it needs com.gemstone.gemfire package names
     if (out instanceof VersionedDataOutputStream) {
       VersionedDataOutputStream vout = (VersionedDataOutputStream) out;
-      Version version = Version.getVersionForDataStream(vout);
+      SerializationVersion version = vout.getVersion();
       if (version != null && version.compareTo(Version.GFE_90) < 0) {
         return processClassName(name, GEODE, GEMFIRE, newClassNamesToOld);
       }

Reply via email to