This is an automated email from the ASF dual-hosted git repository. bschuchardt pushed a commit to branch dsfid_separation_wip in repository https://gitbox.apache.org/repos/asf/geode.git
commit 26e6aabf9729de216d726f135f707d4dfdce94c9 Author: Bruce Schuchardt <bschucha...@pivotal.io> AuthorDate: Fri Aug 23 15:50:31 2019 -0700 introduction of SerializationVersion, a superclass of Version --- ...ackwardCompatibilitySerializationDUnitTest.java | 11 +- .../cache/client/ClientCacheFactoryJUnitTest.java | 4 +- .../internal/InternalDataSerializerBenchmark.java | 2 +- .../client/internal/ClientSideHandshakeImpl.java | 16 +-- .../membership/InternalDistributedMember.java | 13 +- .../internal/membership/gms/GMSMember.java | 15 ++- .../membership/gms/messenger/JGroupsMessenger.java | 7 +- .../distributed/internal/tcpserver/TcpClient.java | 17 ++- .../distributed/internal/tcpserver/TcpServer.java | 4 +- .../geode/internal/HeapDataOutputStream.java | 4 +- .../geode/internal/InternalDataSerializer.java | 35 +++--- .../java/org/apache/geode/internal/Version.java | 130 ++----------------- .../geode/internal/VersionedObjectInput.java | 8 +- .../geode/internal/VersionedObjectOutput.java | 8 +- .../apache/geode/internal/cache/DiskInitFile.java | 3 +- .../org/apache/geode/internal/cache/EventID.java | 2 +- .../geode/internal/cache/FilterRoutingInfo.java | 2 +- .../org/apache/geode/internal/cache/Oplog.java | 10 +- .../cache/persistence/DiskInitFileParser.java | 3 +- .../tier/sockets/ClientRegistrationMetadata.java | 7 +- .../tier/sockets/ServerSideHandshakeFactory.java | 3 +- .../tier/sockets/ServerSideHandshakeImpl.java | 19 +-- .../internal/cache/wan/GatewaySenderEventImpl.java | 2 +- .../serialization/BufferDataOutputStream.java | 11 +- .../internal/serialization/ByteArrayDataInput.java | 17 +-- .../internal/serialization/DSFIDSerializer.java | 15 ++- .../serialization/SerializationVersion.java | 140 +++++++++++++++++++++ .../serialization/SerializationVersions.java | 4 +- .../serialization/VersionedDataInputStream.java | 6 +- .../serialization/VersionedDataOutputStream.java | 6 +- .../serialization/VersionedDataStream.java | 4 +- .../org/apache/geode/internal/tcp/Connection.java | 4 +- .../apache/geode/internal/tcp/MsgDestreamer.java | 2 +- .../tcp/VersionedByteBufferInputStream.java | 8 +- .../geode/internal/tcp/VersionedMsgStreamer.java | 8 +- .../org/apache/geode/internal/util/BlobHelper.java | 2 +- .../java/org/apache/geode/DataSerializerTest.java | 4 +- .../cache/execute/FunctionAdapterJUnitTest.java | 2 +- .../membership/gms/GMSMemberJUnitTest.java | 2 +- .../geode/internal/ByteArrayDataInputTest.java | 2 +- .../geode/internal/DataSerializableJUnitTest.java | 4 +- .../geode/internal/cache/FilterInfoTest.java | 2 +- .../cache/ha/EventIdOptimizationJUnitTest.java | 4 +- .../internal/results/PageEntryJUnitTest.java | 2 +- .../apache/geode/OldClientSupportDUnitTest.java | 10 +- .../gemstone/gemfire/OldClientSupportProvider.java | 3 +- 46 files changed, 319 insertions(+), 268 deletions(-) diff --git a/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java b/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java index 7a0396c..b6a774c 100755 --- a/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java +++ b/geode-core/src/distributedTest/java/org/apache/geode/internal/BackwardCompatibilitySerializationDUnitTest.java @@ -35,6 +35,7 @@ import org.junit.experimental.categories.Category; import org.apache.geode.cache.Cache; import org.apache.geode.internal.cache.DistributedPutAllOperation.EntryVersionsList; import org.apache.geode.internal.serialization.DataSerializableFixedID; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.SerializationVersions; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.serialization.VersionedDataOutputStream; @@ -93,7 +94,7 @@ public class BackwardCompatibilitySerializationDUnitTest extends JUnit4CacheTest @Test public void testToDataFromHigherVersionToLower() throws Exception { DataOutputStream dos = - new VersionedDataOutputStream(new DataOutputStream(baos), Version.GFE_56.ordinal()); + new VersionedDataOutputStream(new DataOutputStream(baos), Version.GFE_56); InternalDataSerializer.writeDSFID(msg, dos); assertTrue(toDataPre66Called); assertFalse(toDataCalled); @@ -107,7 +108,7 @@ public class BackwardCompatibilitySerializationDUnitTest extends JUnit4CacheTest @Test public void testToDataFromLowerVersionToHigher() throws Exception { DataOutputStream dos = - new VersionedDataOutputStream(new DataOutputStream(baos), Version.GFE_701.ordinal()); + new VersionedDataOutputStream(new DataOutputStream(baos), Version.GFE_701); InternalDataSerializer.writeDSFID(msg, dos); assertTrue(toDataCalled); } @@ -123,7 +124,7 @@ public class BackwardCompatibilitySerializationDUnitTest extends JUnit4CacheTest this.bais = new ByteArrayInputStream(baos.toByteArray()); DataInputStream dis = - new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_701.ordinal()); + new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_701); Object o = InternalDataSerializer.basicReadObject(dis); assertTrue(o instanceof TestMessage); assertTrue(fromDataCalled); @@ -140,7 +141,7 @@ public class BackwardCompatibilitySerializationDUnitTest extends JUnit4CacheTest this.bais = new ByteArrayInputStream(baos.toByteArray()); DataInputStream dis = - new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_56.ordinal()); + new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_56); Object o = InternalDataSerializer.basicReadObject(dis); assertTrue(o instanceof TestMessage); assertTrue(fromDataPre66Called); @@ -192,7 +193,7 @@ public class BackwardCompatibilitySerializationDUnitTest extends JUnit4CacheTest } private void checkSupportForRollingUpgrade(Object ds) { - Version[] versions = null; + SerializationVersion[] versions = null; if (ds instanceof SerializationVersions) { versions = ((SerializationVersions) ds).getSerializationVersions(); } diff --git a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java index fbebdd6..cfa5ee0 100644 --- a/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java +++ b/geode-core/src/integrationTest/java/org/apache/geode/cache/client/ClientCacheFactoryJUnitTest.java @@ -356,7 +356,7 @@ public class ClientCacheFactoryJUnitTest { DataInputStream in = new VersionedDataInputStream(new ByteArrayInputStream(out.toByteArray()), - Version.CURRENT_ORDINAL); + Version.CURRENT); ClientProxyMembershipID newID = DataSerializer.readObject(in); InternalDistributedMember newMemberID = (InternalDistributedMember) newID.getDistributedMember(); @@ -374,7 +374,7 @@ public class ClientCacheFactoryJUnitTest { DataSerializer.writeObject(clientID, out); in = new VersionedDataInputStream(new ByteArrayInputStream(out.toByteArray()), - Version.CURRENT_ORDINAL); + Version.CURRENT); newID = DataSerializer.readObject(in); newMemberID = (InternalDistributedMember) newID.getDistributedMember(); assertThat(newMemberID.getVersionObject()).isEqualTo(Version.CURRENT); diff --git a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java index cc12370..cddc566 100644 --- a/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java +++ b/geode-core/src/jmh/java/org/apache/geode/internal/InternalDataSerializerBenchmark.java @@ -66,7 +66,7 @@ public class InternalDataSerializerBenchmark { @BenchmarkMode(Mode.Throughput) @OutputTimeUnit(TimeUnit.MILLISECONDS) public String readStringBenchmark() throws IOException { - dataInput.initialize(serializedBytes, Version.CURRENT_ORDINAL); + dataInput.initialize(serializedBytes, Version.CURRENT); String result = InternalDataSerializer.readString(dataInput, DSCODE.STRING_BYTES.toByte()); return result; } diff --git a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java index 3a61b0d..4dd4cf8 100644 --- a/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/cache/client/internal/ClientSideHandshakeImpl.java @@ -62,6 +62,7 @@ import org.apache.geode.internal.cache.tier.sockets.Handshake; import org.apache.geode.internal.cache.tier.sockets.ServerQueueStatus; import org.apache.geode.internal.security.SecurityService; import org.apache.geode.internal.serialization.ByteArrayDataInput; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.serialization.VersionedDataOutputStream; import org.apache.geode.security.AuthenticationFailedException; @@ -211,11 +212,11 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand // Successful handshake for GATEWAY_TO_GATEWAY mode sets the peer version in connection if (communicationMode.isWAN() && !(acceptanceCode == REPLY_EXCEPTION_AUTHENTICATION_REQUIRED || acceptanceCode == REPLY_EXCEPTION_AUTHENTICATION_FAILED)) { - short wanSiteVersion = Version.readOrdinal(dis); + short wanSiteVersion = SerializationVersion.readOrdinal(dis); conn.setWanSiteVersion(wanSiteVersion); // establish a versioned stream for the other site, if necessary if (wanSiteVersion < Version.CURRENT_ORDINAL) { - dis = new VersionedDataInputStream(dis, wanSiteVersion); + dis = new VersionedDataInputStream(dis, Version.fromOrdinalOrCurrent(wanSiteVersion)); } } @@ -234,7 +235,8 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand // Read delta-propagation property value from server. // [sumedh] Static variable below? Client can connect to different // DSes with different values of this. It shoule be a member variable. - if (!communicationMode.isWAN() && currentClientVersion.compareTo(Version.GFE_61) >= 0) { + if (!communicationMode.isWAN() && currentClientVersion.compareTo( + Version.GFE_61) >= 0) { ((InternalDistributedSystem) system).setDeltaEnabledOnServer(dis.readBoolean()); } @@ -270,7 +272,7 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand byte[] memberBytes = DataSerializer.readByteArray(p_dis); Version v = InternalDataSerializer.getVersionForDataStreamOrNull(p_dis); - ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v == null ? 0 : v.ordinal()); + ByteArrayDataInput dis = new ByteArrayDataInput(memberBytes, v); try { return DataSerializer.readObject(dis); } catch (EOFException e) { @@ -372,9 +374,9 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand hdos.writeByte(communicationMode.getModeNumber()); if (overrideClientVersion > 0) { // for testing - Version.writeOrdinal(hdos, overrideClientVersion, true); + SerializationVersion.writeOrdinal(hdos, overrideClientVersion, true); } else { - Version.writeOrdinal(hdos, currentClientVersion.ordinal(), true); + SerializationVersion.writeOrdinal(hdos, currentClientVersion.ordinal(), true); } hdos.writeByte(replyCode); @@ -389,7 +391,7 @@ public class ClientSideHandshakeImpl extends Handshake implements ClientSideHand // we do not know the receiver's version at this point, but the on-wire // form of InternalDistributedMember changed in 9.0, so we must serialize // it using the previous version - DataOutput idOut = new VersionedDataOutputStream(hdos, Version.GFE_82.ordinal()); + DataOutput idOut = new VersionedDataOutputStream(hdos, Version.GFE_82); DataSerializer.writeObject(this.id, idOut); if (currentClientVersion.compareTo(Version.GFE_603) >= 0) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java index 8c0c597..df93ba7 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/InternalDistributedMember.java @@ -50,6 +50,7 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.versions.VersionSource; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.serialization.DataSerializableFixedID; +import org.apache.geode.internal.serialization.SerializationVersion; /** * This is the fundamental representation of a member of a GemFire distributed system. @@ -115,7 +116,8 @@ public class InternalDistributedMember implements DistributedMember, Externaliza /** The versions in which this message was modified */ @Immutable - private static final Version[] dsfidVersions = new Version[] {Version.GFE_71, Version.GFE_90}; + private static final Version[] dsfidVersions = new Version[] { + Version.GFE_71, Version.GFE_90}; private void defaultToCurrentHost() { netMbr.setProcessId(OSProcess.getId()); @@ -743,7 +745,7 @@ public class InternalDistributedMember implements DistributedMember, Externaliza private short readVersion(int flags, DataInput in) throws IOException { if ((flags & VERSION_BIT) != 0) { - short version = Version.readOrdinal(in); + short version = SerializationVersion.readOrdinal(in); this.versionObj = Version.fromOrdinalNoThrow(version, false); return version; } else { @@ -799,7 +801,7 @@ public class InternalDistributedMember implements DistributedMember, Externaliza DataSerializer.writeString(attributes == null ? "" : attributes.getId(), out); DataSerializer.writeInteger(Integer.valueOf(attributes == null ? 300 : attributes.getTimeout()), out); - Version.writeOrdinal(out, netMbr.getVersionOrdinal(), true); + SerializationVersion.writeOrdinal(out, netMbr.getVersionOrdinal(), true); netMbr.writeAdditionalData(out); } @@ -908,7 +910,7 @@ public class InternalDistributedMember implements DistributedMember, Externaliza durableClientAttributes == null ? 300 : durableClientAttributes.getTimeout()), out); short version = netMbr.getVersionOrdinal(); - Version.writeOrdinal(out, version, true); + SerializationVersion.writeOrdinal(out, version, true); } public void toDataPre_GFE_7_1_0_0(DataOutput out) throws IOException { @@ -1100,7 +1102,8 @@ public class InternalDistributedMember implements DistributedMember, Externaliza netMbr = MemberFactory.newNetMember(inetAddr, hostName, port, sbEnabled, elCoord, InternalDataSerializer.getVersionForDataStream(in).ordinal(), attr); - if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GFE_90) == 0) { + if (InternalDataSerializer.getVersionForDataStream(in).compareTo( + Version.GFE_90) == 0) { netMbr.readAdditionalData(in); } } diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java index f059e64..0b8b996 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/GMSMember.java @@ -28,6 +28,7 @@ import org.apache.geode.internal.InternalDataSerializer; import org.apache.geode.internal.Version; import org.apache.geode.internal.net.SocketCreator; import org.apache.geode.internal.serialization.DataSerializableFixedID; +import org.apache.geode.internal.serialization.SerializationVersion; /** * GMSMember is the membership identifier class for Group Membership Services. @@ -521,7 +522,7 @@ public class GMSMember implements DataSerializableFixedID { .writeString(durableId == null ? "" : durableId, out); DataSerializer.writeInteger(durableId == null ? 300 : durableTimeout, out); - Version.writeOrdinal(out, versionOrdinal, true); + SerializationVersion.writeOrdinal(out, versionOrdinal, true); if (versionOrdinal >= Version.GFE_90.ordinal()) { writeAdditionalData(out); @@ -529,7 +530,7 @@ public class GMSMember implements DataSerializableFixedID { } public void writeEssentialData(DataOutput out) throws IOException { - Version.writeOrdinal(out, this.versionOrdinal, true); + SerializationVersion.writeOrdinal(out, this.versionOrdinal, true); int flags = 0; if (networkPartitionDetectionEnabled) @@ -543,7 +544,8 @@ public class GMSMember implements DataSerializableFixedID { out.writeInt(vmViewId); out.writeLong(uuidMSBs); out.writeLong(uuidLSBs); - if (InternalDataSerializer.getVersionForDataStream(out).compareTo(Version.GEODE_1_2_0) >= 0) { + if (InternalDataSerializer.getVersionForDataStream(out).compareTo( + Version.GEODE_1_2_0) >= 0) { out.writeByte(vmKind); } } @@ -587,7 +589,7 @@ public class GMSMember implements DataSerializableFixedID { private short readVersion(int flags, DataInput in) throws IOException { if ((flags & VERSION_BIT) != 0) { - return Version.readOrdinal(in); + return SerializationVersion.readOrdinal(in); } else { // prior to 7.1 member IDs did not serialize their version information Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in); @@ -607,7 +609,7 @@ public class GMSMember implements DataSerializableFixedID { } public void readEssentialData(DataInput in) throws IOException, ClassNotFoundException { - this.versionOrdinal = Version.readOrdinal(in); + this.versionOrdinal = SerializationVersion.readOrdinal(in); int flags = in.readShort(); this.networkPartitionDetectionEnabled = (flags & NPD_ENABLED_BIT) != 0; @@ -623,7 +625,8 @@ public class GMSMember implements DataSerializableFixedID { this.vmViewId = in.readInt(); this.uuidMSBs = in.readLong(); this.uuidLSBs = in.readLong(); - if (InternalDataSerializer.getVersionForDataStream(in).compareTo(Version.GEODE_1_2_0) >= 0) { + if (InternalDataSerializer.getVersionForDataStream(in).compareTo( + Version.GEODE_1_2_0) >= 0) { this.vmKind = in.readByte(); } this.isPartial = true; diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java index 75385d9..305a857 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/membership/gms/messenger/JGroupsMessenger.java @@ -97,6 +97,7 @@ import org.apache.geode.internal.admin.remote.RemoteTransportConfig; import org.apache.geode.internal.alerting.AlertingAction; import org.apache.geode.internal.cache.DistributedCacheOperation; import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.tcp.MemberShunnedException; @@ -1002,10 +1003,10 @@ public class JGroupsMessenger implements Messenger { DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf, jgmsg.getOffset(), jgmsg.getLength())); - short ordinal = Version.readOrdinal(dis); + short ordinal = SerializationVersion.readOrdinal(dis); if (ordinal < Version.CURRENT_ORDINAL) { - dis = new VersionedDataInputStream(dis, ordinal); + dis = new VersionedDataInputStream(dis, Version.fromOrdinalNoThrow(ordinal, true)); } // read @@ -1101,7 +1102,7 @@ public class JGroupsMessenger implements Messenger { DataInputStream in = new DataInputStream(new ByteArrayInputStream(data)); if (ordinal < Version.CURRENT_ORDINAL) { - in = new VersionedDataInputStream(in, ordinal); + in = new VersionedDataInputStream(in, Version.fromOrdinalNoThrow(ordinal, true)); } GMSMessage result = deserializeMessage(in, ordinal); diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java index 29f0b8e..230b0c9 100644 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpClient.java @@ -34,6 +34,7 @@ import org.apache.logging.log4j.Logger; import org.apache.geode.DataSerializer; import org.apache.geode.annotations.internal.MakeNotStatic; +import org.apache.geode.cache.UnsupportedVersionException; import org.apache.geode.distributed.internal.DistributionConfig; import org.apache.geode.internal.Version; import org.apache.geode.internal.admin.SSLConfig; @@ -209,7 +210,7 @@ public class TcpClient { out = new DataOutputStream(new BufferedOutputStream(sock.getOutputStream())); if (serverVersion < Version.CURRENT_ORDINAL) { - out = new VersionedDataOutputStream(out, serverVersion); + out = new VersionedDataOutputStream(out, Version.fromOrdinalNoThrow(serverVersion, false)); } out.writeInt(gossipVersion); @@ -221,7 +222,7 @@ public class TcpClient { if (replyExpected) { DataInputStream in = new DataInputStream(sock.getInputStream()); - in = new VersionedDataInputStream(in, serverVersion); + in = new VersionedDataInputStream(in, Version.fromOrdinal(serverVersion, false)); try { Object response = DataSerializer.readObject(in); logger.debug("received response: {}", response); @@ -236,6 +237,13 @@ public class TcpClient { } else { return null; } + } catch (UnsupportedVersionException ex) { + if (logger.isDebugEnabled()) { + logger + .debug("Remote TcpServer version: " + serverVersion + " is higher than local version: " + + Version.CURRENT_ORDINAL + ". This is never expected as remoteVersion"); + } + return null; } finally { try { if (replyExpected) { @@ -288,8 +296,7 @@ public class TcpClient { try { OutputStream outputStream = new BufferedOutputStream(sock.getOutputStream()); DataOutputStream out = - new VersionedDataOutputStream(new DataOutputStream(outputStream), - Version.GFE_57.ordinal()); + new VersionedDataOutputStream(new DataOutputStream(outputStream), Version.GFE_57); out.writeInt(gossipVersion); @@ -299,7 +306,7 @@ public class TcpClient { InputStream inputStream = sock.getInputStream(); DataInputStream in = new DataInputStream(inputStream); - in = new VersionedDataInputStream(in, Version.GFE_57.ordinal()); + in = new VersionedDataInputStream(in, Version.GFE_57); try { Object readObject = DataSerializer.readObject(in); if (!(readObject instanceof VersionResponse)) { diff --git a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java index 99658d0..42f5150 100755 --- a/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java +++ b/geode-core/src/main/java/org/apache/geode/distributed/internal/tcpserver/TcpServer.java @@ -458,7 +458,7 @@ public class TcpServer { log.debug("Locator reading request from " + socket.getInetAddress() + " with version " + Version.fromOrdinal(versionOrdinal, false)); } - input = new VersionedDataInputStream(input, versionOrdinal); + input = new VersionedDataInputStream(input, Version.fromOrdinal(versionOrdinal, false)); request = DataSerializer.readObject(input); if (log.isDebugEnabled()) { log.debug("Locator received request " + request + " from " + socket.getInetAddress()); @@ -484,7 +484,7 @@ public class TcpServer { DataOutputStream output = new DataOutputStream(socket.getOutputStream()); if (versionOrdinal != Version.CURRENT_ORDINAL) { output = - new VersionedDataOutputStream(output, versionOrdinal); + new VersionedDataOutputStream(output, Version.fromOrdinal(versionOrdinal, false)); } DataSerializer.writeObject(response, output); output.flush(); diff --git a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java index 7f16e5c..f99f371 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/HeapDataOutputStream.java @@ -67,7 +67,7 @@ public class HeapDataOutputStream extends * instead referenced. */ public HeapDataOutputStream(int allocSize, Version version, boolean doNotCopy) { - super(allocSize, version == null ? Version.CURRENT_ORDINAL : version.ordinal(), doNotCopy); + super(allocSize, version, doNotCopy); } /** @@ -75,7 +75,7 @@ public class HeapDataOutputStream extends * instead referenced. */ public HeapDataOutputStream(ByteBuffer initialBuffer, Version version, boolean doNotCopy) { - super(initialBuffer, version == null ? Version.CURRENT_ORDINAL : version.ordinal(), doNotCopy); + super(initialBuffer, version, doNotCopy); } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java index a6ec1a9..af742fe 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/InternalDataSerializer.java @@ -112,6 +112,7 @@ import org.apache.geode.internal.serialization.DSCODE; import org.apache.geode.internal.serialization.DSFIDSerializer; import org.apache.geode.internal.serialization.DataSerializableFixedID; import org.apache.geode.internal.serialization.DscodeHelper; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.SerializationVersions; import org.apache.geode.internal.serialization.VersionedDataStream; import org.apache.geode.internal.util.concurrent.CopyOnWriteHashMap; @@ -351,7 +352,7 @@ public abstract class InternalDataSerializer extends DataSerializer { } if (out instanceof VersionedDataStream) { VersionedDataStream vout = (VersionedDataStream) out; - Version version = Version.getVersionForDataStream(vout); + Version version = (Version) vout.getVersion(); if (null != version) { if (version.compareTo(Version.GEODE_1_9_0) < 0) { if (name.equals(POST_GEODE_190_SERVER_CQIMPL)) { @@ -2242,7 +2243,7 @@ public abstract class InternalDataSerializer extends DataSerializer { try { ObjectOutput oos = new ObjectOutputStream(stream); if (stream instanceof VersionedDataStream) { - Version v = Version.getVersionForDataStream((VersionedDataStream) stream); + Version v = (Version) ((VersionedDataStream) stream).getVersion(); if (v != null && v != Version.CURRENT) { oos = new VersionedObjectOutput(oos, v); } @@ -2280,7 +2281,7 @@ public abstract class InternalDataSerializer extends DataSerializer { if (Version.CURRENT != v && v != null) { // get versions where DataOutput was upgraded - Version[] versions = null; + SerializationVersion[] versions = null; if (ds instanceof SerializationVersions) { SerializationVersions sv = (SerializationVersions) ds; versions = sv.getSerializationVersions(); @@ -2288,7 +2289,7 @@ public abstract class InternalDataSerializer extends DataSerializer { // check if the version of the peer or diskstore is different and // there has been a change in the message if (versions != null) { - for (Version version : versions) { + for (SerializationVersion version : versions) { // if peer version is less than the greatest upgraded version if (v.compareTo(version) < 0) { ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(), @@ -2345,7 +2346,7 @@ public abstract class InternalDataSerializer extends DataSerializer { Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in); if (Version.CURRENT != v && v != null) { // get versions where DataOutput was upgraded - Version[] versions = null; + SerializationVersion[] versions = null; if (ds instanceof SerializationVersions) { SerializationVersions vds = (SerializationVersions) ds; versions = vds.getSerializationVersions(); @@ -2353,7 +2354,7 @@ public abstract class InternalDataSerializer extends DataSerializer { // check if the version of the peer or diskstore is different and // there has been a change in the message if (versions != null) { - for (Version version : versions) { + for (SerializationVersion version : versions) { // if peer version is less than the greatest upgraded version if (v.compareTo(version) < 0) { ds.getClass().getMethod("fromDataPre" + '_' + version.getMethodSuffix(), @@ -2409,12 +2410,13 @@ public abstract class InternalDataSerializer extends DataSerializer { } /** - * Get the {@link Version} of the peer or disk store that created this {@link DataInput}. + * Get the {@link org.apache.geode.internal.Version} of the peer or disk store that created this + * {@link DataInput}. */ public static Version getVersionForDataStream(DataInput in) { // check if this is a versioned data input if (in instanceof VersionedDataStream) { - final Version v = Version.getVersionForDataStream((VersionedDataStream) in); + final Version v = (Version) ((VersionedDataStream) in).getVersion(); return v != null ? v : Version.CURRENT; } else { // assume latest version @@ -2423,13 +2425,14 @@ public abstract class InternalDataSerializer extends DataSerializer { } /** - * Get the {@link Version} of the peer or disk store that created this {@link DataInput}. Returns + * Get the {@link org.apache.geode.internal.Version} of the peer or disk store that created this + * {@link DataInput}. Returns * null if the version is same as this member's. */ public static Version getVersionForDataStreamOrNull(DataInput in) { // check if this is a versioned data input if (in instanceof VersionedDataStream) { - return Version.getVersionForDataStream((VersionedDataStream) in); + return (Version) ((VersionedDataStream) in).getVersion(); } else { // assume latest version return null; @@ -2437,12 +2440,13 @@ public abstract class InternalDataSerializer extends DataSerializer { } /** - * Get the {@link Version} of the peer or disk store that created this {@link DataOutput}. + * Get the {@link org.apache.geode.internal.Version} of the peer or disk store that created this + * {@link DataOutput}. */ public static Version getVersionForDataStream(DataOutput out) { // check if this is a versioned data output if (out instanceof VersionedDataStream) { - final Version v = Version.getVersionForDataStream((VersionedDataStream) out); + final Version v = (Version) ((VersionedDataStream) out).getVersion(); return v != null ? v : Version.CURRENT; } else { // assume latest version @@ -2451,13 +2455,14 @@ public abstract class InternalDataSerializer extends DataSerializer { } /** - * Get the {@link Version} of the peer or disk store that created this {@link DataOutput}. Returns + * Get the {@link org.apache.geode.internal.Version} of the peer or disk store that created this + * {@link DataOutput}. Returns * null if the version is same as this member's. */ public static Version getVersionForDataStreamOrNull(DataOutput out) { // check if this is a versioned data output if (out instanceof VersionedDataStream) { - return Version.getVersionForDataStream((VersionedDataStream) out); + return (Version) ((VersionedDataStream) out).getVersion(); } else { // assume latest version return null; @@ -2816,7 +2821,7 @@ public abstract class InternalDataSerializer extends DataSerializer { ObjectInput ois = new DSObjectInputStream(stream); serializationFilter.setFilterOn((ObjectInputStream) ois); if (stream instanceof VersionedDataStream) { - Version v = Version.getVersionForDataStream((VersionedDataStream) stream); + Version v = (Version) ((VersionedDataStream) stream).getVersion(); if (Version.CURRENT != v && v != null) { ois = new VersionedObjectInput(ois, v); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/Version.java b/geode-core/src/main/java/org/apache/geode/internal/Version.java index 75f6a59..71243ea 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/Version.java +++ b/geode-core/src/main/java/org/apache/geode/internal/Version.java @@ -18,15 +18,13 @@ package org.apache.geode.internal; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.stream.Collectors; import org.apache.geode.annotations.Immutable; import org.apache.geode.cache.UnsupportedVersionException; import org.apache.geode.internal.cache.tier.sockets.CommandInitializer; -import org.apache.geode.internal.serialization.VersionedDataStream; +import org.apache.geode.internal.serialization.SerializationVersion; /** * Enumerated type for client / server and p2p version. @@ -38,7 +36,7 @@ import org.apache.geode.internal.serialization.VersionedDataStream; * @since GemFire 5.7 */ @Immutable -public class Version implements Comparable<Version> { +public class Version extends SerializationVersion { /** The name of this version */ private final transient String name; @@ -55,20 +53,11 @@ public class Version implements Comparable<Version> { private final byte release; private final byte patch; - /** byte used as ordinal to represent this <code>Version</code> */ - private final short ordinal; - public static final int HIGHEST_VERSION = 107; @Immutable private static final Version[] VALUES = new Version[HIGHEST_VERSION + 1]; - /** - * Reserved token that cannot be used for product version but as a flag in internal contexts. - */ - private static final byte TOKEN_ORDINAL = -1; - private static final int TOKEN_ORDINAL_INT = (TOKEN_ORDINAL & 0xFF); - @Immutable public static final Version TOKEN = new Version("", "TOKEN", (byte) -1, (byte) 0, (byte) 0, (byte) 0, TOKEN_ORDINAL); @@ -309,13 +298,13 @@ public class Version implements Comparable<Version> { /** Creates a new instance of <code>Version</code> */ private Version(String product, String name, byte major, byte minor, byte release, byte patch, byte ordinal) { + super(ordinal); this.productName = product; this.name = name; this.majorVersion = major; this.minorVersion = minor; this.release = release; this.patch = patch; - this.ordinal = ordinal; this.methodSuffix = this.productName + "_" + this.majorVersion + "_" + this.minorVersion + "_" + this.release + "_" + this.patch; if (ordinal != TOKEN_ORDINAL) { @@ -357,42 +346,19 @@ public class Version implements Comparable<Version> { } /** - * Write the given ordinal (result of {@link #ordinal()}) to given {@link DataOutput}. This keeps - * the serialization of ordinal compatible with previous versions writing a single byte to - * DataOutput when possible, and a token with 2 bytes if it is large. - * - * @param out the {@link DataOutput} to write the ordinal write to - * @param ordinal the version to be written - * @param compressed if true, then use single byte for ordinal < 128, and three bytes for beyond - * that, else always use three bytes where the first byte is {@link #TOKEN_ORDINAL}; former - * mode is useful for interoperatibility with previous versions while latter to use fixed - * size for writing version; typically former will be used for P2P/client-server - * communications while latter for persisting to disk; we use the token to ensure that - * {@link #readOrdinal(DataInput)} can deal with both compressed/uncompressed cases - * seemlessly - */ - public static void writeOrdinal(DataOutput out, short ordinal, boolean compressed) - throws IOException { - if (compressed && ordinal <= Byte.MAX_VALUE) { - out.writeByte(ordinal); - } else { - out.writeByte(TOKEN_ORDINAL); - out.writeShort(ordinal); - } - } - - /** * Write this {@link Version}'s ordinal (result of {@link #ordinal()}) to given * {@link DataOutput}. This keeps the serialization of ordinal compatible with previous versions * writing a single byte to DataOutput when possible, and a token with 2 bytes if it is large. * * @param out the {@link DataOutput} to write the ordinal write to * @param compressed if true, then use single byte for ordinal < 128, and three bytes for beyond - * that, else always use three bytes where the first byte is {@link #TOKEN_ORDINAL}; former + * that, else always use three bytes where the first byte is + * {@link SerializationVersion#TOKEN_ORDINAL}; former * mode is useful for interoperatibility with previous versions while latter to use fixed * size for writing version; typically former will be used for P2P/client-server * communications while latter for persisting to disk; we use the token to ensure that - * {@link #readOrdinal(DataInput)} can deal with both compressed/uncompressed cases + * {@link SerializationVersion#readOrdinal(DataInput)} can deal with both + * compressed/uncompressed cases * seemlessly */ public void writeOrdinal(DataOutput out, boolean compressed) throws IOException { @@ -400,37 +366,6 @@ public class Version implements Comparable<Version> { } /** - * Write the given ordinal (result of {@link #ordinal()}) to given {@link ByteBuffer}. This keeps - * the serialization of ordinal compatible with previous versions writing a single byte to - * DataOutput when possible, and a token with 2 bytes if it is large. - * - * @param buffer the {@link ByteBuffer} to write the ordinal write to - * @param ordinal the version to be written - * @param compressed if true, then use single byte for ordinal < 128, and three bytes for beyond - * that, else always use three bytes where the first byte is {@link #TOKEN_ORDINAL} - */ - public static void writeOrdinal(ByteBuffer buffer, short ordinal, boolean compressed) { - if (compressed && ordinal <= Byte.MAX_VALUE) { - buffer.put((byte) ordinal); - } else { - buffer.put(TOKEN_ORDINAL); - buffer.putShort(ordinal); - } - } - - /** - * Reads ordinal as written by {@link #writeOrdinal} from given {@link DataInput}. - */ - public static short readOrdinal(DataInput in) throws IOException { - final byte ordinal = in.readByte(); - if (ordinal != TOKEN_ORDINAL) { - return ordinal; - } else { - return in.readShort(); - } - } - - /** * Return the <code>Version</code> reading from given {@link DataInput} as serialized by * {@link #writeOrdinal(DataOutput, boolean)}. * @@ -451,6 +386,7 @@ public class Version implements Comparable<Version> { return fromOrdinalNoThrow(readOrdinal(in), returnNullForCurrent); } + /** * Return the <code>Version</code> represented by specified ordinal while not throwing exception * if given ordinal is higher than any known ones or does not map to an actual Version instance @@ -466,30 +402,6 @@ public class Version implements Comparable<Version> { return VALUES[ordinal]; } - /** - * Reads ordinal as written by {@link #writeOrdinal} from given {@link InputStream}. Returns -1 on - * end of stream. - */ - public static short readOrdinalFromInputStream(InputStream is) throws IOException { - final int ordinal = is.read(); - if (ordinal != -1) { - if (ordinal != TOKEN_ORDINAL_INT) { - return (short) ordinal; - } else { - // two byte ordinal - final int ordinalPart1 = is.read(); - final int ordinalPart2 = is.read(); - if ((ordinalPart1 | ordinalPart2) >= 0) { - return (short) ((ordinalPart1 << 8) | ordinalPart2); - } else { - return -1; - } - } - } else { - return -1; - } - } - public String getMethodSuffix() { return this.methodSuffix; } @@ -548,21 +460,6 @@ public class Version implements Comparable<Version> { } /** - * {@inheritDoc} - */ - @Override - public int compareTo(Version other) { - if (other != null) { - // byte min/max can't overflow int, so use (a-b) - final int thisOrdinal = this.ordinal; - final int otherOrdinal = other.ordinal; - return (thisOrdinal - otherOrdinal); - } else { - return 1; - } - } - - /** * Returns a string representation for this <code>Version</code>. * * @return the name of this operation. @@ -622,15 +519,4 @@ public class Version implements Comparable<Version> { .collect(Collectors.toList()); } - public static Version getVersionForDataStream(VersionedDataStream input) { - try { - short ordinal = input.getVersionOrdinal(); - if (ordinal <= 0) { - return null; - } - return fromOrdinal(ordinal, false); - } catch (UnsupportedVersionException e) { - return null; - } - } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java index 73b79d7..8ef654f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java +++ b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectInput.java @@ -18,6 +18,7 @@ package org.apache.geode.internal; import java.io.IOException; import java.io.ObjectInput; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataStream; /** @@ -46,11 +47,8 @@ public class VersionedObjectInput implements ObjectInput, VersionedDataStream { * {@inheritDoc} */ @Override - public short getVersionOrdinal() { - if (version == null) { - return Version.CURRENT_ORDINAL; - } - return this.version.ordinal(); + public SerializationVersion getVersion() { + return this.version; } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java index e8d44ff..8d025d8 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java +++ b/geode-core/src/main/java/org/apache/geode/internal/VersionedObjectOutput.java @@ -18,6 +18,7 @@ package org.apache.geode.internal; import java.io.IOException; import java.io.ObjectOutput; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataStream; /** @@ -49,11 +50,8 @@ public class VersionedObjectOutput implements ObjectOutput, VersionedDataStream * {@inheritDoc} */ @Override - public short getVersionOrdinal() { - if (version == null) { - return Version.CURRENT_ORDINAL; - } - return this.version.ordinal(); + public SerializationVersion getVersion() { + return this.version; } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java index 7cf2d51..67affe0 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/DiskInitFile.java @@ -81,6 +81,7 @@ import org.apache.geode.internal.cache.versions.RegionVersionVector; import org.apache.geode.internal.concurrent.ConcurrentHashSet; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.serialization.SerializationVersion; /** * Does all the IF file work for a DiskStoreImpl. @@ -2794,7 +2795,7 @@ public class DiskInitFile implements DiskInitFileInterpreter { try { ByteBuffer bb = getIFWriteBuffer(1 + 3 + 1); bb.put(IFREC_GEMFIRE_VERSION); - Version.writeOrdinal(bb, version.ordinal(), false); + SerializationVersion.writeOrdinal(bb, version.ordinal(), false); bb.put(END_OF_RECORD_ID); writeIFRecord(bb, false); // don't do stats for these small records } catch (IOException ex) { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java index 5d26919..5e08365 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/EventID.java @@ -315,7 +315,7 @@ public class EventID implements DataSerializableFixedID, Serializable, Externali disVersion = Version.GFE_90; } ByteArrayDataInput dis = - new ByteArrayDataInput(membershipID, disVersion == null ? 0 : disVersion.ordinal()); + new ByteArrayDataInput(membershipID, disVersion); InternalDistributedMember result = null; try { result = InternalDistributedMember.readEssentialData(dis); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java index 9a33a06..f227aba 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/FilterRoutingInfo.java @@ -486,7 +486,7 @@ public class FilterRoutingInfo implements VersionedDataSerializable { private void deserialize() { try { ByteArrayDataInput dis = - new ByteArrayDataInput(myData, myDataVersion == null ? 0 : myDataVersion.ordinal()); + new ByteArrayDataInput(myData, myDataVersion); boolean hasCQs = dis.readBoolean(); if (hasCQs) { int numEntries = InternalDataSerializer.readArrayLength(dis); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java index 0675ecc..035f61d 100755 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/Oplog.java @@ -108,6 +108,7 @@ import org.apache.geode.internal.offheap.annotations.Released; import org.apache.geode.internal.offheap.annotations.Retained; import org.apache.geode.internal.sequencelog.EntryLogger; import org.apache.geode.internal.serialization.ByteArrayDataInput; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.shared.NativeCalls; import org.apache.geode.internal.util.BlobHelper; import org.apache.geode.pdx.internal.PdxWriterImpl; @@ -2071,7 +2072,7 @@ public class Oplog implements CompactableOplog, Flushable { private Version readProductVersionRecord(DataInput dis, File f) throws IOException { Version recoveredGFVersion; - short ver = Version.readOrdinal(dis); + short ver = SerializationVersion.readOrdinal(dis); try { recoveredGFVersion = Version.fromOrdinal(ver, false); } catch (UnsupportedVersionException e) { @@ -6362,7 +6363,8 @@ public class Oplog implements CompactableOplog, Flushable { } /** - * If this OpLog is from an older version of the product, then return that {@link Version} else + * If this OpLog is from an older version of the product, then return that + * {@link org.apache.geode.internal.Version} else * return null. */ public Version getProductVersionIfOld() { @@ -6385,7 +6387,7 @@ public class Oplog implements CompactableOplog, Flushable { /** * If this OpLog has data that was written by an older version of the product, then return that - * {@link Version} else return null. + * {@link org.apache.geode.internal.Version} else return null. */ public Version getDataVersionIfOld() { final Version version = this.dataVersion; @@ -6574,7 +6576,7 @@ public class Oplog implements CompactableOplog, Flushable { flushNoSync(olf); } // don't compress since we setup fixed size of buffers - Version.writeOrdinal(bb, ordinal, false); + SerializationVersion.writeOrdinal(bb, ordinal, false); } private void writeInt(OplogFile olf, int v) throws IOException { diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java index 5350a90..d595a92 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/persistence/DiskInitFileParser.java @@ -45,6 +45,7 @@ import org.apache.geode.internal.cache.ProxyBucketRegion; import org.apache.geode.internal.cache.versions.RegionVersionHolder; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.logging.log4j.LogMarker; +import org.apache.geode.internal.serialization.SerializationVersion; public class DiskInitFileParser { private static final Logger logger = LogService.getLogger(); @@ -426,7 +427,7 @@ public class DiskInitFileParser { } break; case DiskInitFile.IFREC_GEMFIRE_VERSION: { - short ver = Version.readOrdinal(dis); + short ver = SerializationVersion.readOrdinal(dis); readEndOfRecord(dis); if (logger.isTraceEnabled(LogMarker.PERSIST_RECOVERY_VERBOSE)) { logger.trace(LogMarker.PERSIST_RECOVERY_VERBOSE, "IFREC_GEMFIRE_VERSION version={}", diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java index 2a6e073..637f2c6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ClientRegistrationMetadata.java @@ -29,6 +29,7 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.logging.LogService; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.serialization.VersionedDataOutputStream; @@ -58,9 +59,9 @@ class ClientRegistrationMetadata { unversionedDataOutputStream)) { if (oldClientRequiresVersionedStreams(clientVersion)) { dataInputStream = - new VersionedDataInputStream(unversionedDataInputStream, clientVersion.ordinal()); + new VersionedDataInputStream(unversionedDataInputStream, clientVersion); dataOutputStream = - new VersionedDataOutputStream(unversionedDataOutputStream, clientVersion.ordinal()); + new VersionedDataOutputStream(unversionedDataOutputStream, clientVersion); } else { dataInputStream = unversionedDataInputStream; dataOutputStream = unversionedDataOutputStream; @@ -113,7 +114,7 @@ class ClientRegistrationMetadata { private boolean getAndValidateClientVersion(final Socket socket, final DataInputStream dataInputStream, final DataOutputStream dataOutputStream) throws IOException { - short clientVersionOrdinal = Version.readOrdinal(dataInputStream); + short clientVersionOrdinal = SerializationVersion.readOrdinal(dataInputStream); try { clientVersion = Version.fromOrdinal(clientVersionOrdinal, true); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java index 387c741..5f793e2 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeFactory.java @@ -32,6 +32,7 @@ import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.cache.tier.ServerSideHandshake; import org.apache.geode.internal.logging.LogService; import org.apache.geode.internal.security.SecurityService; +import org.apache.geode.internal.serialization.SerializationVersion; class ServerSideHandshakeFactory { private static final Logger logger = LogService.getLogger(); @@ -64,7 +65,7 @@ class ServerSideHandshakeFactory { soTimeout = socket.getSoTimeout(); socket.setSoTimeout(timeout); InputStream is = socket.getInputStream(); - short clientVersionOrdinal = Version.readOrdinalFromInputStream(is); + short clientVersionOrdinal = SerializationVersion.readOrdinalFromInputStream(is); if (clientVersionOrdinal == -1) { throw new EOFException( "HandShakeReader: EOF reached before client version could be read"); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java index c4a06b9..624cde3 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/tier/sockets/ServerSideHandshakeImpl.java @@ -35,6 +35,7 @@ import org.apache.geode.internal.cache.tier.CommunicationMode; import org.apache.geode.internal.cache.tier.Encryptor; import org.apache.geode.internal.cache.tier.ServerSideHandshake; import org.apache.geode.internal.security.SecurityService; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataInputStream; import org.apache.geode.internal.serialization.VersionedDataOutputStream; import org.apache.geode.internal.serialization.VersionedDataStream; @@ -87,9 +88,9 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand this.clientReadTimeout = dataInputStream.readInt(); if (clientVersion.compareTo(Version.CURRENT) < 0) { // versioned streams allow object serialization code to deal with older clients - dataInputStream = new VersionedDataInputStream(dataInputStream, clientVersion.ordinal()); + dataInputStream = new VersionedDataInputStream(dataInputStream, clientVersion); dataOutputStream = - new VersionedDataOutputStream(dataOutputStream, clientVersion.ordinal()); + new VersionedDataOutputStream(dataOutputStream, clientVersion); } this.id = ClientProxyMembershipID.readCanonicalized(dataInputStream); // Note: credentials should always be the last piece in handshake for @@ -134,8 +135,8 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand DataOutputStream dos = new DataOutputStream(out); DataInputStream dis; if (clientVersion.compareTo(Version.CURRENT) < 0) { - dis = new VersionedDataInputStream(in, clientVersion.ordinal()); - dos = new VersionedDataOutputStream(dos, clientVersion.ordinal()); + dis = new VersionedDataInputStream(in, clientVersion); + dos = new VersionedDataOutputStream(dos, clientVersion); } else { dis = new DataInputStream(in); } @@ -149,7 +150,7 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand // additional byte of wan site needs to send for Gateway BC if (communicationMode.isWAN()) { - Version.writeOrdinal(dos, currentServerVersion.ordinal(), true); + SerializationVersion.writeOrdinal(dos, currentServerVersion.ordinal(), true); } dos.writeByte(endpointType); @@ -160,7 +161,7 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand Version v = Version.CURRENT; if (dos instanceof VersionedDataStream) { - v = Version.getVersionForDataStream((VersionedDataStream) dos); + v = (Version) ((VersionedDataStream) dos).getVersion(); } HeapDataOutputStream hdos = new HeapDataOutputStream(v); DataSerializer.writeObject(member, hdos); @@ -171,7 +172,8 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand dos.writeUTF(""); // Write delta-propagation property value if this is not WAN. - if (!communicationMode.isWAN() && this.clientVersion.compareTo(Version.GFE_61) >= 0) { + if (!communicationMode.isWAN() && this.clientVersion.compareTo( + Version.GFE_61) >= 0) { dos.writeBoolean(((InternalDistributedSystem) this.system).getConfig().getDeltaPropagation()); } @@ -189,7 +191,8 @@ public class ServerSideHandshakeImpl extends Handshake implements ServerSideHand .getDistributedSystemId()); } - if ((communicationMode.isWAN()) && this.clientVersion.compareTo(Version.GFE_80) >= 0 + if ((communicationMode.isWAN()) && this.clientVersion.compareTo( + Version.GFE_80) >= 0 && currentServerVersion.compareTo(Version.GFE_80) >= 0) { int pdxSize = PeerTypeRegistration.getPdxRegistrySize(); dos.writeInt(pdxSize); diff --git a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java index d32fd50..a9abf34 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java +++ b/geode-core/src/main/java/org/apache/geode/internal/cache/wan/GatewaySenderEventImpl.java @@ -735,7 +735,7 @@ public class GatewaySenderEventImpl // this._id = in.readUTF(); if (version < 0x11 && (in instanceof InputStream) && InternalDataSerializer.getVersionForDataStream(in) == Version.CURRENT) { - in = new VersionedDataInputStream((InputStream) in, Version.GFE_701.ordinal()); + in = new VersionedDataInputStream((InputStream) in, Version.GFE_701); } this.id = (EventID) DataSerializer.readObject(in); // TODO:Asif ; Check if this violates Barry's logic of not assiging VM diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java index 20f336b..245b286 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/BufferDataOutputStream.java @@ -43,7 +43,7 @@ public class BufferDataOutputStream extends OutputStream implements VersionedDat protected LinkedList<ByteBuffer> chunks = null; protected int size = 0; protected boolean ignoreWrites = false; // added for bug 39569 - protected short version; + protected SerializationVersion version; protected boolean doNotCopy; protected ByteBuffer buffer; /** @@ -55,7 +55,7 @@ public class BufferDataOutputStream extends OutputStream implements VersionedDat private Error expansionException = null; private int memoPosition; - public BufferDataOutputStream(int allocSize, short version) { + public BufferDataOutputStream(int allocSize, SerializationVersion version) { this(allocSize, version, false); } @@ -80,7 +80,7 @@ public class BufferDataOutputStream extends OutputStream implements VersionedDat * @param doNotCopy if true then byte arrays/buffers/sources will not be copied to this hdos but * instead referenced. */ - public BufferDataOutputStream(int allocSize, short version, boolean doNotCopy) { + public BufferDataOutputStream(int allocSize, SerializationVersion version, boolean doNotCopy) { if (allocSize < SMALLEST_CHUNK_SIZE) { this.MIN_CHUNK_SIZE = SMALLEST_CHUNK_SIZE; } else { @@ -91,7 +91,8 @@ public class BufferDataOutputStream extends OutputStream implements VersionedDat this.doNotCopy = doNotCopy; } - public BufferDataOutputStream(ByteBuffer initialBuffer, short version, boolean doNotCopy) { + public BufferDataOutputStream(ByteBuffer initialBuffer, SerializationVersion version, + boolean doNotCopy) { if (initialBuffer.position() != 0) { initialBuffer = initialBuffer.slice(); } @@ -151,7 +152,7 @@ public class BufferDataOutputStream extends OutputStream implements VersionedDat * {@inheritDoc} */ @Override - public short getVersionOrdinal() { + public SerializationVersion getVersion() { return version; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java index 20d5f2f..23fff7a 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/ByteArrayDataInput.java @@ -35,7 +35,7 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio private int pos; /** reusable buffer for readUTF */ private char[] charBuf; - private short version; + private SerializationVersion version; /** * Create a {@link DataInput} whose contents are empty. @@ -43,10 +43,10 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio public ByteArrayDataInput() {} public ByteArrayDataInput(byte[] bytes) { - initialize(bytes, (short) 0); + initialize(bytes, null); } - public ByteArrayDataInput(byte[] bytes, short version) { + public ByteArrayDataInput(byte[] bytes, SerializationVersion version) { initialize(bytes, version); } @@ -57,18 +57,18 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio * (a copy is not made) so it should not be changed externally. * @param version the product version that serialized the object on given bytes */ - public void initialize(byte[] bytes, int version) { + public void initialize(byte[] bytes, SerializationVersion version) { this.bytes = bytes; this.nBytes = bytes.length; this.pos = 0; - this.version = (short) version; + this.version = version; } /** * {@inheritDoc} */ @Override - public short getVersionOrdinal() { + public SerializationVersion getVersion() { return version; } @@ -463,7 +463,7 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio this.bytes = null; this.nBytes = 0; this.pos = 0; - this.version = 0; + this.version = null; } /** @@ -471,7 +471,8 @@ public class ByteArrayDataInput extends InputStream implements DataInput, Versio */ @Override public String toString() { - return this.version == 0 ? super.toString() : (super.toString() + " (v" + this.version + ')'); + return this.version == null ? super.toString() + : (super.toString() + " (v" + this.version + ')'); } private void throwUTFEncodingError(int index, int char1, int char2, Integer char3, int enc) diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java index 37cd5a7..d538a02 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/DSFIDSerializer.java @@ -129,11 +129,11 @@ public class DSFIDSerializer { if (Version.CURRENT != v && v != null) { // get versions where DataOutput was upgraded SerializationVersions sv = (SerializationVersions) ds; - Version[] versions = sv.getSerializationVersions(); + SerializationVersion[] versions = sv.getSerializationVersions(); // check if the version of the peer or diskstore is different and // there has been a change in the message if (versions != null) { - for (Version version : versions) { + for (SerializationVersion version : versions) { // if peer version is less than the greatest upgraded version if (v.compareTo(version) < 0) { ds.getClass().getMethod("toDataPre_" + version.getMethodSuffix(), @@ -181,13 +181,12 @@ public class DSFIDSerializer { * Returns * zero if the version is same as this member's. */ - public short getVersionForDataStreamOrZero(DataOutput out) { + public SerializationVersion getVersionForDataStreamOrNull(DataOutput out) { // check if this is a versioned data output if (out instanceof VersionedDataStream) { - return ((VersionedDataStream) out).getVersionOrdinal(); + return ((VersionedDataStream) out).getVersion(); } else { - // assume latest version - return 0; + return null; } } @@ -262,13 +261,13 @@ public class DSFIDSerializer { Version v = InternalDataSerializer.getVersionForDataStreamOrNull(in); if (Version.CURRENT != v && v != null) { // get versions where DataOutput was upgraded - Version[] versions = null; + SerializationVersion[] versions = null; SerializationVersions vds = (SerializationVersions) ds; versions = vds.getSerializationVersions(); // check if the version of the peer or diskstore is different and // there has been a change in the message if (versions != null) { - for (Version version : versions) { + for (SerializationVersion version : versions) { // if peer version is less than the greatest upgraded version if (v.compareTo(version) < 0) { ds.getClass().getMethod("fromDataPre" + '_' + version.getMethodSuffix(), diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java new file mode 100644 index 0000000..4f0cbda --- /dev/null +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersion.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.apache.geode.internal.serialization; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; + +public class SerializationVersion implements Comparable<SerializationVersion> { + + /** + * Reserved token that cannot be used for product version but as a flag in internal contexts. + */ + protected static final byte TOKEN_ORDINAL = -1; + protected static final int TOKEN_ORDINAL_INT = (TOKEN_ORDINAL & 0xFF); + + /** value used as ordinal to represent this <code>SerializationVersion</code> */ + protected final short ordinal; + + public SerializationVersion(int ordinal) { + this.ordinal = (short) ordinal; + } + + /** + * Write the given ordinal (result of ordinal()) to given {@link DataOutput}. This keeps + * the serialization of ordinal compatible with previous versions writing a single byte to + * DataOutput when possible, and a token with 2 bytes if it is large. + * + * @param out the {@link DataOutput} to write the ordinal write to + * @param ordinal the version to be written + * @param compressed if true, then use single byte for ordinal < 128, and three bytes for beyond + * that, else always use three bytes where the first byte is {@link #TOKEN_ORDINAL}; former + * mode is useful for interoperatibility with previous versions while latter to use fixed + * size for writing version; typically former will be used for P2P/client-server + * communications while latter for persisting to disk; we use the token to ensure that + * {@link #readOrdinal(DataInput)} can deal with both compressed/uncompressed cases + * seemlessly + */ + public static void writeOrdinal(DataOutput out, short ordinal, boolean compressed) + throws IOException { + if (compressed && ordinal <= Byte.MAX_VALUE) { + out.writeByte(ordinal); + } else { + out.writeByte(TOKEN_ORDINAL); + out.writeShort(ordinal); + } + } + + /** + * Write the given ordinal (result of ordinal()) to given {@link ByteBuffer}. This keeps + * the serialization of ordinal compatible with previous versions writing a single byte to + * DataOutput when possible, and a token with 2 bytes if it is large. + * + * @param buffer the {@link ByteBuffer} to write the ordinal write to + * @param ordinal the version to be written + * @param compressed if true, then use single byte for ordinal < 128, and three bytes for beyond + * that, else always use three bytes where the first byte is {@link #TOKEN_ORDINAL} + */ + public static void writeOrdinal(ByteBuffer buffer, short ordinal, boolean compressed) { + if (compressed && ordinal <= Byte.MAX_VALUE) { + buffer.put((byte) ordinal); + } else { + buffer.put(TOKEN_ORDINAL); + buffer.putShort(ordinal); + } + } + + /** + * Reads ordinal as written by {@link #writeOrdinal} from given {@link DataInput}. + */ + public static short readOrdinal(DataInput in) throws IOException { + final byte ordinal = in.readByte(); + if (ordinal != TOKEN_ORDINAL) { + return ordinal; + } else { + return in.readShort(); + } + } + + /** + * Reads ordinal as written by writeOrdinal from given InputStream. Returns -1 on + * end of stream. + */ + public static short readOrdinalFromInputStream(InputStream is) throws IOException { + final int ordinal = is.read(); + if (ordinal != -1) { + if (ordinal != TOKEN_ORDINAL_INT) { + return (short) ordinal; + } else { + // two byte ordinal + final int ordinalPart1 = is.read(); + final int ordinalPart2 = is.read(); + if ((ordinalPart1 | ordinalPart2) >= 0) { + return (short) ((ordinalPart1 << 8) | ordinalPart2); + } else { + return -1; + } + } + } else { + return -1; + } + } + + public String getMethodSuffix() { + return "" + ordinal; + } + + @Override + public String toString() { + return "Version{" + + "ordinal=" + ordinal + + '}'; + } + + @Override + public int compareTo(SerializationVersion other) { + if (other != null) { + // byte min/max can't overflow int, so use (a-b) + final int thisOrdinal = this.ordinal; + final int otherOrdinal = other.ordinal; + return (thisOrdinal - otherOrdinal); + } else { + return 1; + } + } +} diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java index 1d9d2d3..5eeaf02 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/SerializationVersions.java @@ -14,8 +14,6 @@ */ package org.apache.geode.internal.serialization; -import org.apache.geode.internal.Version; - /** * This interface is extended by DataSerializableFixedID and VersionedDataSerializable in order to * furnish version information to the serialization infrastructure for backward compatibility @@ -34,6 +32,6 @@ public interface SerializationVersions { * The method name is formed with the version's product name and its major, minor, release and * patch numbers. */ - Version[] getSerializationVersions(); + SerializationVersion[] getSerializationVersions(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java index ff0fc2f..b9f0b65 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataInputStream.java @@ -28,7 +28,7 @@ import java.io.InputStream; */ public class VersionedDataInputStream extends DataInputStream implements VersionedDataStream { - private final short version; + private final SerializationVersion version; /** * Creates a VersionedDataInputStream that uses the specified underlying InputStream. @@ -36,7 +36,7 @@ public class VersionedDataInputStream extends DataInputStream implements Version * @param in the specified input stream * @param version the product version that serialized object on the given input stream */ - public VersionedDataInputStream(InputStream in, short version) { + public VersionedDataInputStream(InputStream in, SerializationVersion version) { super(in); this.version = version; } @@ -45,7 +45,7 @@ public class VersionedDataInputStream extends DataInputStream implements Version * {@inheritDoc} */ @Override - public short getVersionOrdinal() { + public SerializationVersion getVersion() { return this.version; } diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java index ddbba6e..9e582f6 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataOutputStream.java @@ -26,7 +26,7 @@ import java.io.OutputStream; */ public class VersionedDataOutputStream extends DataOutputStream implements VersionedDataStream { - private final short version; + private SerializationVersion version; /** * Creates a VersionedDataOutputStream that wraps the specified underlying OutputStream. @@ -34,7 +34,7 @@ public class VersionedDataOutputStream extends DataOutputStream implements Versi * @param out the underlying output stream * @param version the product version that serialized object on the given {@link OutputStream} */ - public VersionedDataOutputStream(OutputStream out, short version) { + public VersionedDataOutputStream(OutputStream out, SerializationVersion version) { super(out); this.version = version; } @@ -43,7 +43,7 @@ public class VersionedDataOutputStream extends DataOutputStream implements Versi * {@inheritDoc} */ @Override - public short getVersionOrdinal() { + public SerializationVersion getVersion() { return this.version; } } diff --git a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java index d7f7b7a..93dbca9 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/serialization/VersionedDataStream.java @@ -37,10 +37,10 @@ public interface VersionedDataStream { /** * If the remote peer to which this input/output is connected has a lower version that this - * member, then this returns the {@link Version} ordinal of the peer else 0. If the peer has a + * member, then this returns the {@link Version} of the peer else null. If the peer has a * higher * {@link Version}, then this member cannot do any adjustment to serialization and its the remote * peer's responsibility to adjust the serialization/deserialization according to this peer. */ - short getVersionOrdinal(); + SerializationVersion getVersion(); } diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java index 60113e8..ef3c430 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/Connection.java @@ -80,6 +80,7 @@ import org.apache.geode.internal.net.BufferPool; import org.apache.geode.internal.net.NioFilter; import org.apache.geode.internal.net.NioPlainEngine; import org.apache.geode.internal.net.SocketCreator; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.tcp.MsgReader.Header; import org.apache.geode.internal.util.concurrent.ReentrantSemaphore; @@ -651,7 +652,8 @@ public class Connection implements Runnable { bb.putInt(cfg.getAsyncQueueTimeout()); bb.putInt(cfg.getAsyncMaxQueueSize()); // write own product version - Version.writeOrdinal(bb, Version.CURRENT.ordinal(), true); + SerializationVersion + .writeOrdinal(bb, Version.CURRENT.ordinal(), true); // now set the msg length into position 0 bb.putInt(0, calcHdrSize(bb.position() - MSG_HEADER_BYTES)); my_okHandshakeBuf = bb; diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java index 814ce1f..8de1b3f 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/MsgDestreamer.java @@ -240,7 +240,7 @@ public class MsgDestreamer { final Version v = version; DataInputStream dis = v == null ? new DataInputStream(this.is) - : new VersionedDataInputStream(this.is, v.ordinal()); + : new VersionedDataInputStream(this.is, v); long startSer = this.stats.startMsgDeserialization(); setResult((DistributionMessage) InternalDataSerializer.readDSFID(dis)); this.stats.endMsgDeserialization(startSer); diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java index 0945afe..d77686b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedByteBufferInputStream.java @@ -18,6 +18,7 @@ package org.apache.geode.internal.tcp; import java.nio.ByteBuffer; import org.apache.geode.internal.Version; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataStream; /** @@ -57,11 +58,8 @@ public class VersionedByteBufferInputStream extends ByteBufferInputStream * {@inheritDoc} */ @Override - public short getVersionOrdinal() { - if (version == null) { - return Version.CURRENT_ORDINAL; - } - return this.version.ordinal(); + public SerializationVersion getVersion() { + return this.version; } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java index 9eed78c..020c90b 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java +++ b/geode-core/src/main/java/org/apache/geode/internal/tcp/VersionedMsgStreamer.java @@ -21,6 +21,7 @@ import org.apache.geode.distributed.internal.DMStats; import org.apache.geode.distributed.internal.DistributionMessage; import org.apache.geode.internal.Version; import org.apache.geode.internal.net.BufferPool; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataStream; /** @@ -42,11 +43,8 @@ class VersionedMsgStreamer extends MsgStreamer implements VersionedDataStream { * {@inheritDoc} */ @Override - public short getVersionOrdinal() { - if (version == null) { - return Version.CURRENT_ORDINAL; - } - return this.version.ordinal(); + public SerializationVersion getVersion() { + return this.version; } /** diff --git a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java index f045151..82b2ee7 100644 --- a/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java +++ b/geode-core/src/main/java/org/apache/geode/internal/util/BlobHelper.java @@ -95,7 +95,7 @@ public class BlobHelper { if (in == null) { in = new ByteArrayDataInput(); } - in.initialize(blob, version == null ? 0 : version.ordinal()); + in.initialize(blob, version); result = DataSerializer.readObject(in); } endDeserialization(start, blob.length); diff --git a/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java b/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java index 823ee30..5ae74a7 100644 --- a/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java +++ b/geode-core/src/test/java/org/apache/geode/DataSerializerTest.java @@ -57,9 +57,9 @@ public class DataSerializerTest { public void readStringShouldReturnCanonicalEmptyString() throws IOException { byte[] serializedEmptyStringBytes = BlobHelper.serializeToBlob(""); ByteArrayDataInput dataInput1 = new ByteArrayDataInput(); - dataInput1.initialize(serializedEmptyStringBytes, 0); + dataInput1.initialize(serializedEmptyStringBytes, null); ByteArrayDataInput dataInput2 = new ByteArrayDataInput(); - dataInput2.initialize(serializedEmptyStringBytes, 0); + dataInput2.initialize(serializedEmptyStringBytes, null); String firstRead = DataSerializer.readString(dataInput1); String secondRead = DataSerializer.readString(dataInput2); diff --git a/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java b/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java index 734c9e9..cbc2ca5 100644 --- a/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/cache/execute/FunctionAdapterJUnitTest.java @@ -77,7 +77,7 @@ public class FunctionAdapterJUnitTest { + "serializedFunctionAdapterWithDifferentSerialVersionUID.ser").getAbsolutePath()); DataInputStream dis = - new VersionedDataInputStream(new DataInputStream(fis), Version.GFE_82.ordinal()); + new VersionedDataInputStream(new DataInputStream(fis), Version.GFE_82); Object o = InternalDataSerializer.basicReadObject(dis); assertTrue(o instanceof FunctionAdapter); } diff --git a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java index 29f7dac..7c41ee5 100644 --- a/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/distributed/internal/membership/gms/GMSMemberJUnitTest.java @@ -202,7 +202,7 @@ public class GMSMemberJUnitTest { dataOutput = new HeapDataOutputStream(Version.GFE_90); member.writeEssentialData(dataOutput); bais = new ByteArrayInputStream(baos.toByteArray()); - dataInput = new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_90.ordinal()); + dataInput = new VersionedDataInputStream(new DataInputStream(bais), Version.GFE_90); newMember = new GMSMember(); newMember.readEssentialData(dataInput); assertEquals(0, newMember.getVmKind()); diff --git a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java index 015f949..aa71e0c 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/ByteArrayDataInputTest.java @@ -239,7 +239,7 @@ public class ByteArrayDataInputTest { return new java.io.DataInputStream(new java.io.ByteArrayInputStream(inputBytes)); } else { ByteArrayDataInput input = new ByteArrayDataInput(); - input.initialize(inputBytes, 0); + input.initialize(inputBytes, null); return input; } } diff --git a/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java index bb53fe4..d731d97 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/DataSerializableJUnitTest.java @@ -1165,13 +1165,13 @@ public class DataSerializableJUnitTest implements Serializable { VersionedDataSerializableImpl ds = new VersionedDataSerializableImpl(getRandom()); VersionedDataOutputStream v = - new VersionedDataOutputStream(this.baos, Version.GFE_70.ordinal()); + new VersionedDataOutputStream(this.baos, Version.GFE_70); DataSerializer.writeObject(ds, v); v.flush(); ByteBuffer bb = ByteBuffer.wrap(this.baos.toByteArray()); ByteBufferInputStream bbis = new ByteBufferInputStream(bb); - VersionedDataInputStream vin = new VersionedDataInputStream(bbis, Version.GFE_70.ordinal()); + VersionedDataInputStream vin = new VersionedDataInputStream(bbis, Version.GFE_70); VersionedDataSerializableImpl ds2 = (VersionedDataSerializableImpl) DataSerializer.readObject(vin); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java index 014f94d..067fee2 100644 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/FilterInfoTest.java @@ -52,7 +52,7 @@ public class FilterInfoTest { byte[] outputBytes = dataOut.toByteArray(); FilterInfo deserialized = new FilterInfo(); ByteArrayDataInput dataInput = new ByteArrayDataInput(); - dataInput.initialize(outputBytes, Version.CURRENT_ORDINAL); + dataInput.initialize(outputBytes, Version.CURRENT); deserialized.fromData(dataInput); assertThat(deserialized.getCQs()).isEqualTo(cqs); assertThat(deserialized.getInterestedClients()).isEqualTo(clients); diff --git a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java index d185e9d..a5ea15a 100755 --- a/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java +++ b/geode-core/src/test/java/org/apache/geode/internal/cache/ha/EventIdOptimizationJUnitTest.java @@ -180,7 +180,7 @@ public class EventIdOptimizationJUnitTest { HeapDataOutputStream hdos90 = new HeapDataOutputStream(256, Version.GFE_90); - VersionedDataOutputStream dop = new VersionedDataOutputStream(hdos90, Version.GFE_90.ordinal()); + VersionedDataOutputStream dop = new VersionedDataOutputStream(hdos90, Version.GFE_90); eventID.toData(dop); @@ -188,7 +188,7 @@ public class EventIdOptimizationJUnitTest { VersionedDataInputStream dataInputStream = - new VersionedDataInputStream(bais, Version.GFE_90.ordinal()); + new VersionedDataInputStream(bais, Version.GFE_90); EventID eventID2 = new EventID(); eventID2.fromData(dataInputStream); diff --git a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java index 88f9e4e..2b84108 100644 --- a/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java +++ b/geode-lucene/src/test/java/org/apache/geode/cache/lucene/internal/results/PageEntryJUnitTest.java @@ -84,7 +84,7 @@ public class PageEntryJUnitTest { entry.toData(out); final byte[] bytes = out.toByteArray(); ByteArrayDataInput in = new ByteArrayDataInput(); - in.initialize(bytes, (short) 0); + in.initialize(bytes, null); PageEntry newEntry = new PageEntry(); newEntry.fromData(in); return newEntry; diff --git a/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java b/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java index 4d21631..d2e9ff1 100644 --- a/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java +++ b/geode-old-client-support/src/distributedTest/java/org/apache/geode/OldClientSupportDUnitTest.java @@ -109,7 +109,7 @@ public class OldClientSupportDUnitTest extends JUnit4CacheTestCase { Version oldClientVersion = Version.GFE_82; VersionedDataOutputStream dout = new VersionedDataOutputStream( - new HeapDataOutputStream(10, oldClientVersion), oldClientVersion.ordinal()); + new HeapDataOutputStream(10, oldClientVersion), oldClientVersion); for (String geodeClassName : newArrayClassNames) { String newName = oldClientSupport.processOutgoingClassName(geodeClassName, dout); @@ -122,7 +122,7 @@ public class OldClientSupportDUnitTest extends JUnit4CacheTestCase { } VersionedDataInputStream din = new VersionedDataInputStream( - new DataInputStream(new ByteArrayInputStream(new byte[10])), oldClientVersion.ordinal()); + new DataInputStream(new ByteArrayInputStream(new byte[10])), oldClientVersion); for (String oldClassName : oldArrayClassNames) { String newName = oldClientSupport.processIncomingClassName(oldClassName, din); @@ -198,7 +198,7 @@ public class OldClientSupportDUnitTest extends JUnit4CacheTestCase { byte[] serializedForm = byteStream.toByteArray(); ByteArrayDataInput byteDataInput = new ByteArrayDataInput(); - byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal()); + byteDataInput.initialize(serializedForm, Version.GFE_82); ClientSerializableObject result = DataSerializer.readObject(byteDataInput); Assert.assertEquals("Expected an org.apache.geode exception but found " + result, result.getClass().getName().substring(0, "org.apache.geode".length()), "org.apache.geode"); @@ -225,7 +225,7 @@ public class OldClientSupportDUnitTest extends JUnit4CacheTestCase { byte[] serializedForm = byteStream.toByteArray(); ByteArrayDataInput byteDataInput = new ByteArrayDataInput(); - byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal()); + byteDataInput.initialize(serializedForm, Version.GFE_82); Object result = DataSerializer.readObject(byteDataInput); Assert.assertEquals("Expected an org.apache.geode object but found " + result, result.getClass().getName().substring(0, "org.apache.geode".length()), "org.apache.geode"); @@ -249,7 +249,7 @@ public class OldClientSupportDUnitTest extends JUnit4CacheTestCase { byte[] serializedForm = byteStream.toByteArray(); ByteArrayDataInput byteDataInput = new ByteArrayDataInput(); - byteDataInput.initialize(serializedForm, Version.GFE_82.ordinal()); + byteDataInput.initialize(serializedForm, Version.GFE_82); Object result = DataSerializer.readObject(byteDataInput); Assert.assertEquals("Expected an org.apache.geode object but found " + result, result.getClass().getName().substring(0, "org.apache.geode".length()), "org.apache.geode"); diff --git a/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java b/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java index d6878e4..a0695ce 100644 --- a/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java +++ b/geode-old-client-support/src/main/java/com/gemstone/gemfire/OldClientSupportProvider.java @@ -26,6 +26,7 @@ import org.apache.geode.internal.Version; import org.apache.geode.internal.cache.CacheService; import org.apache.geode.internal.cache.InternalCache; import org.apache.geode.internal.cache.tier.sockets.OldClientSupportService; +import org.apache.geode.internal.serialization.SerializationVersion; import org.apache.geode.internal.serialization.VersionedDataOutputStream; import org.apache.geode.management.internal.beans.CacheServiceMBeanBase; @@ -106,7 +107,7 @@ public class OldClientSupportProvider implements OldClientSupportService { // if the client is old then it needs com.gemstone.gemfire package names if (out instanceof VersionedDataOutputStream) { VersionedDataOutputStream vout = (VersionedDataOutputStream) out; - Version version = Version.getVersionForDataStream(vout); + SerializationVersion version = vout.getVersion(); if (version != null && version.compareTo(Version.GFE_90) < 0) { return processClassName(name, GEODE, GEMFIRE, newClassNamesToOld); }