ctubbsii commented on a change in pull request #2224:
URL: https://github.com/apache/accumulo/pull/2224#discussion_r717183367



##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());

Review comment:
       ```suggestion
       this(Map.of());
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+

Review comment:
       An import for suggestions farther down in the file:
   
   ```suggestion
   import static java.util.Objects.requireNonNull;
   
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();
+      cos.close();
+
+      payload = bos.toByteArray();
+
+      log.debug("Output: {}", payload);
+
+    }
+
+    cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(payload)) {
+
+      // write the property map keys, values.
+      try (CipherInputStream cis = new CipherInputStream(bis, cipher);
+
+          DataInputStream cdis = new DataInputStream(cis)) {
+

Review comment:
       ```suggestion
       try (var bis = new ByteArrayInputStream(payload)) {
   
         // write the property map keys, values.
         try (var cis = new CipherInputStream(bis, cipher);
             var cdis = new DataInputStream(cis)) {
   
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();
+      cos.close();
+
+      payload = bos.toByteArray();
+
+      log.debug("Output: {}", payload);
+
+    }
+
+    cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(payload)) {
+
+      // write the property map keys, values.
+      try (CipherInputStream cis = new CipherInputStream(bis, cipher);
+
+          DataInputStream cdis = new DataInputStream(cis)) {
+
+        assertEquals("A", cdis.readUTF());
+        assertEquals("B", cdis.readUTF());
+        assertEquals("C", cdis.readUTF());
+      }
+    }
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryption() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec encoder = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes = encoder.toBytes(vProps);
+
+    log.debug("Encoded: {}", encodedBytes);
+
+    VersionedProperties decodedProps = encoder.fromBytes(encodedBytes);
+
+    log.debug("Decoded: {}", decodedProps.print(true));
+
+    assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+    // validate that the expected node version matches original version.
+    assertEquals(aVersion, vProps.getDataVersion());
+
+    // validate encoded version incremented.
+    assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+    assertEquals("encoded version should be 1 up", aVersion + 1, 
decodedProps.getDataVersion());
+    assertEquals("version written should be the source next version", 
vProps.getNextVersion(),
+        decodedProps.getDataVersion());
+    assertEquals("the next version in decoded should be +2", aVersion + 2,
+        decodedProps.getNextVersion());
+
+    assertTrue("timestamp should be now or earlier",
+        vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryptionCompressed() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    // compression friendly
+    Map<String,String> p = new HashMap<>();
+    p.put("accumulo.prop.key_name.1", "value1");
+    p.put("accumulo.prop.key_name.2", "value2");
+    p.put("accumulo.prop.key_name.3", "value3");
+    p.put("accumulo.prop.key_name.4", "value4");
+    p.put("accumulo.prop.key_name.5", "value5");
+    p.put("accumulo.prop.key_name.6", "value9");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec encoder1 = VersionedPropEncryptCodec.codec(true,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes = encoder1.toBytes(vProps);
+
+    log.debug("len: {}, bytes: {}", encodedBytes.length, encodedBytes);
+
+    VersionedProperties decodedProps = encoder1.fromBytes(encodedBytes);
+
+    log.debug("Decoded: {}", decodedProps.print(true));
+
+    assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+    // validate that the expected node version matches original version.
+    assertEquals(aVersion, vProps.getDataVersion());
+
+    // validate encoded version incremented.
+    assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+    assertEquals("encoded version should be 1 up", aVersion + 1, 
decodedProps.getDataVersion());
+    assertEquals("version written should be the source next version", 
vProps.getNextVersion(),
+        decodedProps.getDataVersion());
+    assertEquals("the next version in decoded should be +2", aVersion + 2,
+        decodedProps.getNextVersion());
+
+    assertTrue("timestamp should be now or earlier",
+        vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+  }
+
+  @Test
+  public void validateEncryptedValuesChange() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);

Review comment:
       ```suggestion
       var vProps = new VersionedProperties(aVersion, Instant.now(), 
Map.of("k1", "v1"));
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/EncodingOptions.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.StringJoiner;
+
+/**
+ * Serialization metadata to allow for evolution of the encoding used for 
property storage. This
+ * info is expected to be stored first in the serialization and uncompressed 
so that the handling of
+ * subsequent fields and data can be processed correctly and without 
additional processing.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class EncodingOptions {
+
+  // Adding an encoding version must be done as an addition. Do not change or 
delete previous
+  // version numbers
+  public static final int EncodingVersion_1_0 = 1;
+  public static final int EXPERIMENTAL_CIPHER_ENCODING_1_0 = 999;
+
+  private final int encodingVersion;
+  private final boolean compress;
+
+  EncodingOptions(final int encodingVersion, final boolean compress) {
+    this.encodingVersion = encodingVersion;
+    this.compress = compress;
+  }
+
+  /**
+   * Instantiate encoding options to use version 1.0 encoding settings.
+   *
+   * @param compress
+   *          when true compress the property map.
+   * @return the encoding options.
+   */
+  public static EncodingOptions V1_0(final boolean compress) {
+    return new EncodingOptions(EncodingVersion_1_0, compress);
+  }
+
+  /**
+   * Instantiate an instance of EncodingOptions reading the values from an 
input stream. Typically,
+   * the stream will be obtained from reading a byte array from a data store 
and then creating a
+   * stream that reads from that array,
+   *
+   * @param dis
+   *          a data input stream
+   * @throws IOException
+   *           if an exception occurs reading from the input stream.
+   */
+  public EncodingOptions(final DataInputStream dis) throws IOException {

Review comment:
       Could do a named static method, rather than a constructor here. It'd be 
an opportunity to have a more explicit name (similar to your `V1_0` name 
above). Maybe `fromStream` or just `decode` to match the member method named 
`encode`. `write()` and `read()` also work (and would conflict less with the 
name of the class, which refers to encoding options for properties; a method 
named `encode` to encode the encoder options is a bit confusing).

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;

Review comment:
       Can avoid the Guava type by using Java's own built-in immutable types:
   
   ```suggestion
     private final Map<String,String> props;
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;

Review comment:
       Removing imports that aren't needed after unused method is removed:
   ```suggestion
   import java.time.Instant;
   import java.util.HashMap;
   import java.util.Map;
   
   import javax.crypto.Cipher;
   import javax.crypto.CipherInputStream;
   import javax.crypto.CipherOutputStream;
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();
+      cos.close();
+
+      payload = bos.toByteArray();
+
+      log.debug("Output: {}", payload);
+
+    }
+
+    cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(payload)) {
+
+      // write the property map keys, values.
+      try (CipherInputStream cis = new CipherInputStream(bis, cipher);
+
+          DataInputStream cdis = new DataInputStream(cis)) {
+
+        assertEquals("A", cdis.readUTF());
+        assertEquals("B", cdis.readUTF());
+        assertEquals("C", cdis.readUTF());
+      }
+    }
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryption() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec encoder = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes = encoder.toBytes(vProps);
+
+    log.debug("Encoded: {}", encodedBytes);
+
+    VersionedProperties decodedProps = encoder.fromBytes(encodedBytes);
+
+    log.debug("Decoded: {}", decodedProps.print(true));
+
+    assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+    // validate that the expected node version matches original version.
+    assertEquals(aVersion, vProps.getDataVersion());
+
+    // validate encoded version incremented.
+    assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+    assertEquals("encoded version should be 1 up", aVersion + 1, 
decodedProps.getDataVersion());
+    assertEquals("version written should be the source next version", 
vProps.getNextVersion(),
+        decodedProps.getDataVersion());
+    assertEquals("the next version in decoded should be +2", aVersion + 2,
+        decodedProps.getNextVersion());
+
+    assertTrue("timestamp should be now or earlier",
+        vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryptionCompressed() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    // compression friendly
+    Map<String,String> p = new HashMap<>();
+    p.put("accumulo.prop.key_name.1", "value1");
+    p.put("accumulo.prop.key_name.2", "value2");
+    p.put("accumulo.prop.key_name.3", "value3");
+    p.put("accumulo.prop.key_name.4", "value4");
+    p.put("accumulo.prop.key_name.5", "value5");
+    p.put("accumulo.prop.key_name.6", "value9");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec encoder1 = VersionedPropEncryptCodec.codec(true,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes = encoder1.toBytes(vProps);
+
+    log.debug("len: {}, bytes: {}", encodedBytes.length, encodedBytes);
+
+    VersionedProperties decodedProps = encoder1.fromBytes(encodedBytes);
+
+    log.debug("Decoded: {}", decodedProps.print(true));
+
+    assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+    // validate that the expected node version matches original version.
+    assertEquals(aVersion, vProps.getDataVersion());
+
+    // validate encoded version incremented.
+    assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+    assertEquals("encoded version should be 1 up", aVersion + 1, 
decodedProps.getDataVersion());
+    assertEquals("version written should be the source next version", 
vProps.getNextVersion(),
+        decodedProps.getDataVersion());
+    assertEquals("the next version in decoded should be +2", aVersion + 2,
+        decodedProps.getNextVersion());
+
+    assertTrue("timestamp should be now or earlier",
+        vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+  }
+
+  @Test
+  public void validateEncryptedValuesChange() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec codec1 = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes1 = codec1.toBytes(vProps);
+
+    VersionedPropCodec codec2 = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes2 = codec2.toBytes(vProps);
+
+    log.debug("Encoded: {}", encodedBytes1);
+    log.debug("Encoded: {}", encodedBytes2);
+
+    VersionedProperties from2 = codec1.fromBytes(encodedBytes2);
+    VersionedProperties from1 = codec2.fromBytes(encodedBytes1);
+
+    assertEquals(from1.getProperties(), from2.getProperties());
+
+    VersionedPropCodec codec3 = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    VersionedProperties from3 = codec3.fromBytes(encodedBytes1);
+    assertEquals(from1.getDataVersion(), from3.getDataVersion());
+    assertEquals(from1.getProperties(), from3.getProperties());
+
+    assertNotEquals(encodedBytes1, encodedBytes2);
+
+  }
+
+  private String keyGen() throws NoSuchAlgorithmException {
+    SecretKey secretKey = KeyGenerator.getInstance("AES").generateKey();
+    return Base64.getEncoder().encodeToString(secretKey.getEncoded());
+  }

Review comment:
       This method isn't used and can be deleted.
   
   ```suggestion
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);

Review comment:
       More places var could be useful:
   ```suggestion
       try (var bos = new ByteArrayOutputStream()) {
   
         var cos = new CipherOutputStream(bos, cipher);
   
         var dos = new DataOutputStream(cos);
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");

Review comment:
       Using var to clean up redundant type specifications in local variables:
   ```suggestion
       var cipherProps = new VersionedPropEncryptCodec.GCMCipherParams(pass, 
salt);
   
       var cipher = Cipher.getInstance("AES/GCM/NoPadding");
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());
+  }
+
+  /**
+   * Instantiate an initial instance with default version info and provided 
property map.
+   *
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(Map<String,String> props) {
+    this(NO_VERSION, Instant.now(), props);
+  }
+
+  /**
+   * Instantiate an instance and set the initial properties to the provided 
values.
+   *
+   * @param dataVersion
+   *          version info with data version and timestamp.
+   * @param timestamp
+   *          timestamp of this version.
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(final int dataVersion, final Instant timestamp,
+      final Map<String,String> props) {
+    this.dataVersion = dataVersion;
+    this.timestamp = timestamp;
+    if (Objects.nonNull(props)) {
+      this.props = new 
ImmutableMap.Builder<String,String>().putAll(props).build();
+    } else {
+      this.props = new ImmutableMap.Builder<String,String>().build();
+    }
+  }
+
+  /**
+   * Get an unmodifiable map with all property key,value pairs.
+   *
+   * @return An unmodifiable view of the property key, value pairs.
+   */
+  public Map<String,String> getProperties() {
+    return props;
+  }
+
+  /**
+   * Get the current data version. The version should match the node version 
of the stored data. The
+   * value should be used on data writes as the expected version. If the data 
write fails do to an
+   * unexpected version, it signals that the node version has changed since 
the instance was
+   * instantiated and encoded.
+   *
+   * @return 0 for initial version, otherwise the data version when the 
properties were serialized.
+   */
+  public int getDataVersion() {
+    return Math.max(dataVersion, 0);
+  }
+
+  /**
+   * Calculates the version that should be stored when serialized. The 
serialized version, when
+   * stored, should match the version that will be assigned. This way, data 
reading the serialized
+   * version can compare the stored version with the node version at any time 
to detect if the node
+   * version has been updated.
+   * <p>
+   * The initialization of the data version to a negative value allows this 
value to be calculated
+   * correctly for the first serialization. On the first store, the expected 
version will be 0.
+   *
+   * @return the next version number that should be serialized, or 0 if this 
is the initial version.
+   */
+  public int getNextVersion() {
+    return Math.max(dataVersion + 1, 0);
+  }
+
+  /**
+   * The timestamp of the instance when created or last modified.
+   *
+   * @return the timestamp of the instance.
+   */
+  public Instant getTimestamp() {
+    return Instant.from(timestamp);

Review comment:
       Removing redundant indirect reference, since Instant is immutable, and 
the implementation just does this anyway:
   ```suggestion
       return timestamp;
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+

Review comment:
       Updating imports for suggestions further down:
   
   ```suggestion
   import java.util.HashMap;
   import java.util.Map;
   import java.util.TreeMap;
   
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();
+      cos.close();
+
+      payload = bos.toByteArray();
+
+      log.debug("Output: {}", payload);
+
+    }
+
+    cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(payload)) {
+
+      // write the property map keys, values.
+      try (CipherInputStream cis = new CipherInputStream(bis, cipher);
+
+          DataInputStream cdis = new DataInputStream(cis)) {
+
+        assertEquals("A", cdis.readUTF());
+        assertEquals("B", cdis.readUTF());
+        assertEquals("C", cdis.readUTF());
+      }
+    }
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryption() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);
+
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropCodec encoder = VersionedPropEncryptCodec.codec(false,
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt));
+
+    byte[] encodedBytes = encoder.toBytes(vProps);
+
+    log.debug("Encoded: {}", encodedBytes);
+
+    VersionedProperties decodedProps = encoder.fromBytes(encodedBytes);
+
+    log.debug("Decoded: {}", decodedProps.print(true));
+
+    assertEquals(vProps.getProperties(), decodedProps.getProperties());
+
+    // validate that the expected node version matches original version.
+    assertEquals(aVersion, vProps.getDataVersion());
+
+    // validate encoded version incremented.
+    assertEquals(aVersion + 1, decodedProps.getDataVersion());
+
+    assertEquals("encoded version should be 1 up", aVersion + 1, 
decodedProps.getDataVersion());
+    assertEquals("version written should be the source next version", 
vProps.getNextVersion(),
+        decodedProps.getDataVersion());
+    assertEquals("the next version in decoded should be +2", aVersion + 2,
+        decodedProps.getNextVersion());
+
+    assertTrue("timestamp should be now or earlier",
+        vProps.getTimestamp().compareTo(Instant.now()) <= 0);
+
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryptionCompressed() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    // compression friendly
+    Map<String,String> p = new HashMap<>();
+    p.put("accumulo.prop.key_name.1", "value1");
+    p.put("accumulo.prop.key_name.2", "value2");
+    p.put("accumulo.prop.key_name.3", "value3");
+    p.put("accumulo.prop.key_name.4", "value4");
+    p.put("accumulo.prop.key_name.5", "value5");
+    p.put("accumulo.prop.key_name.6", "value9");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);

Review comment:
       ```suggestion
   
       // compression friendly
       var p = Map.of("accumulo.prop.key_name.1", "value1", 
"accumulo.prop.key_name.2", "value2",
           "accumulo.prop.key_name.3", "value3", "accumulo.prop.key_name.4", 
"value4",
           "accumulo.prop.key_name.5", "value5", "accumulo.prop.key_name.6", 
"value9");
   
       VersionedProperties vProps = new VersionedProperties(aVersion, 
Instant.now(), p);
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());
+  }
+
+  /**
+   * Instantiate an initial instance with default version info and provided 
property map.
+   *
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(Map<String,String> props) {
+    this(NO_VERSION, Instant.now(), props);
+  }
+
+  /**
+   * Instantiate an instance and set the initial properties to the provided 
values.
+   *
+   * @param dataVersion
+   *          version info with data version and timestamp.
+   * @param timestamp
+   *          timestamp of this version.
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(final int dataVersion, final Instant timestamp,
+      final Map<String,String> props) {
+    this.dataVersion = dataVersion;
+    this.timestamp = timestamp;
+    if (Objects.nonNull(props)) {
+      this.props = new 
ImmutableMap.Builder<String,String>().putAll(props).build();
+    } else {
+      this.props = new ImmutableMap.Builder<String,String>().build();
+    }

Review comment:
       Make immutable copy using built-ins, avoiding Guava:
   
   ```suggestion
       this.timestamp = requireNonNull(timestamp);
       this.props = props == null ? Map.of() : Map.copyOf(props);
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());
+  }
+
+  /**
+   * Instantiate an initial instance with default version info and provided 
property map.
+   *
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(Map<String,String> props) {
+    this(NO_VERSION, Instant.now(), props);
+  }
+
+  /**
+   * Instantiate an instance and set the initial properties to the provided 
values.
+   *
+   * @param dataVersion
+   *          version info with data version and timestamp.
+   * @param timestamp
+   *          timestamp of this version.
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(final int dataVersion, final Instant timestamp,
+      final Map<String,String> props) {
+    this.dataVersion = dataVersion;
+    this.timestamp = timestamp;
+    if (Objects.nonNull(props)) {
+      this.props = new 
ImmutableMap.Builder<String,String>().putAll(props).build();
+    } else {
+      this.props = new ImmutableMap.Builder<String,String>().build();
+    }
+  }
+
+  /**
+   * Get an unmodifiable map with all property key,value pairs.
+   *
+   * @return An unmodifiable view of the property key, value pairs.
+   */
+  public Map<String,String> getProperties() {
+    return props;
+  }
+
+  /**
+   * Get the current data version. The version should match the node version 
of the stored data. The
+   * value should be used on data writes as the expected version. If the data 
write fails do to an
+   * unexpected version, it signals that the node version has changed since 
the instance was
+   * instantiated and encoded.
+   *
+   * @return 0 for initial version, otherwise the data version when the 
properties were serialized.
+   */
+  public int getDataVersion() {
+    return Math.max(dataVersion, 0);
+  }
+
+  /**
+   * Calculates the version that should be stored when serialized. The 
serialized version, when
+   * stored, should match the version that will be assigned. This way, data 
reading the serialized
+   * version can compare the stored version with the node version at any time 
to detect if the node
+   * version has been updated.
+   * <p>
+   * The initialization of the data version to a negative value allows this 
value to be calculated
+   * correctly for the first serialization. On the first store, the expected 
version will be 0.
+   *
+   * @return the next version number that should be serialized, or 0 if this 
is the initial version.
+   */
+  public int getNextVersion() {
+    return Math.max(dataVersion + 1, 0);
+  }
+
+  /**
+   * The timestamp of the instance when created or last modified.
+   *
+   * @return the timestamp of the instance.
+   */
+  public Instant getTimestamp() {
+    return Instant.from(timestamp);
+  }
+
+  /**
+   * The timestamp formatted as an ISO 8601 string with format of
+   * {@code YYYY-MM-DDTHH:mm:ss.SSSSSSZ}
+   *
+   * @return a formatted timestamp string.
+   */
+  public String getTimestampISO() {
+    return tsFormatter.format(timestamp);
+  }
+
+  /**
+   * Update a single property. If a property already exists it is overwritten.
+   * <p>
+   * It is much more efficient to add multiple properties at a time rather 
than one by one.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param key
+   *          the property name.
+   * @param value
+   *          the property value.
+   * @return A new instance of this class with the property added or updated.
+   */
+  public VersionedProperties addOrUpdate(final String key, final String value) 
{
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(new HashMap<>() {
+          {
+            putAll(props);
+            put(key, value);
+          }
+        }).build();
+    return new VersionedProperties(dataVersion, Instant.now(), updated);
+  }
+
+  /**
+   * Add or update multiple properties. If a property already exists it is 
overwritten.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param updates
+   *          A map of key, values pairs.
+   * @return A new instance of this class with the properties added or updated.
+   */
+  public VersionedProperties addOrUpdate(final Map<String,String> updates) {
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(new HashMap<>() {
+          {
+            putAll(props);
+            putAll(updates);
+          }
+        }).build();
+    return new VersionedProperties(dataVersion, Instant.now(), updated);
+  }
+
+  /**
+   * Delete multiple properties provided as a collection of keys.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param keys
+   *          a collection of the keys that if they exist, will be removed.
+   * @return A new instance of this class.
+   */
+  public VersionedProperties remove(Collection<String> keys) {
+
+    HashMap<String,String> orig = new HashMap<>(props);
+    keys.forEach(orig::remove);
+
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(orig).build();
+

Review comment:
       Avoid unnecessary complexity, since the constructor will make an 
immutable copy anyway and this is easier to understand and maintain:
   ```suggestion
       var updated = new HashMap<>(props);
       updated.keySet().removeAll(keys); // removing from keySet view removes 
from underlying mapping
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());
+  }
+
+  /**
+   * Instantiate an initial instance with default version info and provided 
property map.
+   *
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(Map<String,String> props) {
+    this(NO_VERSION, Instant.now(), props);
+  }
+
+  /**
+   * Instantiate an instance and set the initial properties to the provided 
values.
+   *
+   * @param dataVersion
+   *          version info with data version and timestamp.
+   * @param timestamp
+   *          timestamp of this version.
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(final int dataVersion, final Instant timestamp,
+      final Map<String,String> props) {
+    this.dataVersion = dataVersion;
+    this.timestamp = timestamp;
+    if (Objects.nonNull(props)) {
+      this.props = new 
ImmutableMap.Builder<String,String>().putAll(props).build();
+    } else {
+      this.props = new ImmutableMap.Builder<String,String>().build();
+    }
+  }
+
+  /**
+   * Get an unmodifiable map with all property key,value pairs.
+   *
+   * @return An unmodifiable view of the property key, value pairs.
+   */
+  public Map<String,String> getProperties() {
+    return props;
+  }
+
+  /**
+   * Get the current data version. The version should match the node version 
of the stored data. The
+   * value should be used on data writes as the expected version. If the data 
write fails do to an
+   * unexpected version, it signals that the node version has changed since 
the instance was
+   * instantiated and encoded.
+   *
+   * @return 0 for initial version, otherwise the data version when the 
properties were serialized.
+   */
+  public int getDataVersion() {
+    return Math.max(dataVersion, 0);
+  }
+
+  /**
+   * Calculates the version that should be stored when serialized. The 
serialized version, when
+   * stored, should match the version that will be assigned. This way, data 
reading the serialized
+   * version can compare the stored version with the node version at any time 
to detect if the node
+   * version has been updated.
+   * <p>
+   * The initialization of the data version to a negative value allows this 
value to be calculated
+   * correctly for the first serialization. On the first store, the expected 
version will be 0.
+   *
+   * @return the next version number that should be serialized, or 0 if this 
is the initial version.
+   */
+  public int getNextVersion() {
+    return Math.max(dataVersion + 1, 0);
+  }
+
+  /**
+   * The timestamp of the instance when created or last modified.
+   *
+   * @return the timestamp of the instance.
+   */
+  public Instant getTimestamp() {
+    return Instant.from(timestamp);
+  }
+
+  /**
+   * The timestamp formatted as an ISO 8601 string with format of
+   * {@code YYYY-MM-DDTHH:mm:ss.SSSSSSZ}
+   *
+   * @return a formatted timestamp string.
+   */
+  public String getTimestampISO() {
+    return tsFormatter.format(timestamp);
+  }
+
+  /**
+   * Update a single property. If a property already exists it is overwritten.
+   * <p>
+   * It is much more efficient to add multiple properties at a time rather 
than one by one.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param key
+   *          the property name.
+   * @param value
+   *          the property value.
+   * @return A new instance of this class with the property added or updated.
+   */
+  public VersionedProperties addOrUpdate(final String key, final String value) 
{
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(new HashMap<>() {
+          {
+            putAll(props);
+            put(key, value);
+          }
+        }).build();
+    return new VersionedProperties(dataVersion, Instant.now(), updated);
+  }
+
+  /**
+   * Add or update multiple properties. If a property already exists it is 
overwritten.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param updates
+   *          A map of key, values pairs.
+   * @return A new instance of this class with the properties added or updated.
+   */
+  public VersionedProperties addOrUpdate(final Map<String,String> updates) {
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(new HashMap<>() {
+          {
+            putAll(props);
+            putAll(updates);
+          }
+        }).build();

Review comment:
       Avoid unnecessary complexity, since the constructor will make an 
immutable copy anyway and this is easier to understand and maintain:
   ```suggestion
       var updated = new HashMap<>(props);
       updated.putAll(updates);
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedProperties.java
##########
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.ZoneOffset;
+import java.time.format.DateTimeFormatter;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.TreeMap;
+
+import com.google.common.collect.ImmutableMap;
+
+/**
+ * Version properties maintain a {@code Map<String,String>}; of property k,v 
pairs along with
+ * versioning information metadata.
+ * <p>
+ * The metadata used to verify cached values match stored values. Storing the 
metadata with the
+ * properties allows for comparison of properties and can be used to ensure 
that values being
+ * written to the backend store have not changed. This metadata should be 
written / appear early in
+ * the encoded bytes and be uncompressed so that decisions can be made that 
may make deserialization
+ * unnecessary.
+ * <p>
+ * Note: Avoid using -1 because that has significance in ZooKeeper - writing a 
ZooKeeper node with a
+ * version of -1 disables the ZooKeeper expected version checking and just 
overwrites the node.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class VersionedProperties {
+
+  public static final DateTimeFormatter tsFormatter =
+      
DateTimeFormatter.ISO_OFFSET_DATE_TIME.withZone(ZoneId.from(ZoneOffset.UTC));
+  // flag value for initialization - on store both the version and next 
version should be 0.
+  private static final int NO_VERSION = -2;
+  private final int dataVersion;
+  private final Instant timestamp;
+  private final ImmutableMap<String,String> props;
+
+  /**
+   * Instantiate an initial instance with default version info and empty map.
+   */
+  public VersionedProperties() {
+    this(NO_VERSION, Instant.now(), Collections.emptyMap());
+  }
+
+  /**
+   * Instantiate an initial instance with default version info and provided 
property map.
+   *
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(Map<String,String> props) {
+    this(NO_VERSION, Instant.now(), props);
+  }
+
+  /**
+   * Instantiate an instance and set the initial properties to the provided 
values.
+   *
+   * @param dataVersion
+   *          version info with data version and timestamp.
+   * @param timestamp
+   *          timestamp of this version.
+   * @param props
+   *          optional map of initial property key, value pairs. The 
properties are assumed to have
+   *          been previously validated (if required)
+   */
+  public VersionedProperties(final int dataVersion, final Instant timestamp,
+      final Map<String,String> props) {
+    this.dataVersion = dataVersion;
+    this.timestamp = timestamp;
+    if (Objects.nonNull(props)) {
+      this.props = new 
ImmutableMap.Builder<String,String>().putAll(props).build();
+    } else {
+      this.props = new ImmutableMap.Builder<String,String>().build();
+    }
+  }
+
+  /**
+   * Get an unmodifiable map with all property key,value pairs.
+   *
+   * @return An unmodifiable view of the property key, value pairs.
+   */
+  public Map<String,String> getProperties() {
+    return props;
+  }
+
+  /**
+   * Get the current data version. The version should match the node version 
of the stored data. The
+   * value should be used on data writes as the expected version. If the data 
write fails do to an
+   * unexpected version, it signals that the node version has changed since 
the instance was
+   * instantiated and encoded.
+   *
+   * @return 0 for initial version, otherwise the data version when the 
properties were serialized.
+   */
+  public int getDataVersion() {
+    return Math.max(dataVersion, 0);
+  }
+
+  /**
+   * Calculates the version that should be stored when serialized. The 
serialized version, when
+   * stored, should match the version that will be assigned. This way, data 
reading the serialized
+   * version can compare the stored version with the node version at any time 
to detect if the node
+   * version has been updated.
+   * <p>
+   * The initialization of the data version to a negative value allows this 
value to be calculated
+   * correctly for the first serialization. On the first store, the expected 
version will be 0.
+   *
+   * @return the next version number that should be serialized, or 0 if this 
is the initial version.
+   */
+  public int getNextVersion() {
+    return Math.max(dataVersion + 1, 0);
+  }
+
+  /**
+   * The timestamp of the instance when created or last modified.
+   *
+   * @return the timestamp of the instance.
+   */
+  public Instant getTimestamp() {
+    return Instant.from(timestamp);
+  }
+
+  /**
+   * The timestamp formatted as an ISO 8601 string with format of
+   * {@code YYYY-MM-DDTHH:mm:ss.SSSSSSZ}
+   *
+   * @return a formatted timestamp string.
+   */
+  public String getTimestampISO() {
+    return tsFormatter.format(timestamp);
+  }
+
+  /**
+   * Update a single property. If a property already exists it is overwritten.
+   * <p>
+   * It is much more efficient to add multiple properties at a time rather 
than one by one.
+   * <p>
+   * Because instances of this class are immutable, this method creates a new 
copy of the
+   * properties. Other processes will continue to see original values 
retrieved from the data store.
+   * Other processes will receive an update when the instance is encoded and 
stored in the data
+   * store and then retrieved with the normal store update mechanisms.
+   *
+   * @param key
+   *          the property name.
+   * @param value
+   *          the property value.
+   * @return A new instance of this class with the property added or updated.
+   */
+  public VersionedProperties addOrUpdate(final String key, final String value) 
{
+    ImmutableMap<String,String> updated =
+        ImmutableMap.<String,String>builder().putAll(new HashMap<>() {
+          {
+            putAll(props);
+            put(key, value);
+          }
+        }).build();

Review comment:
       Avoid unnecessary complexity, since the constructor will make an 
immutable copy anyway and this is easier to understand and maintain:
   ```suggestion
       var updated = new HashMap<>(props);
       updated.put(key, value);
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropCodecTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * Exercise the base class specific methods - most testing will occur in 
subclasses
+ */
+public class VersionedPropCodecTest {
+
+  @Test(expected = IllegalArgumentException.class)
+  public void invalidEncodingNullArray() {
+    VersionedPropCodec.getEncodingVersion(null);
+  }
+
+  @Test
+  public void validEncoding() {
+    // length so that array reads do not error
+    byte[] bytes = new byte[100];
+    int encodingVersion = VersionedPropCodec.getEncodingVersion(bytes);
+    assertEquals(0, encodingVersion);
+  }
+
+  /**
+   * The timestamp will be invalid - this should cause a timestamp parse error 
that will be remapped
+   * to an IllegalArgumentException.
+   */
+  @Test(expected = IllegalArgumentException.class)
+  public void getDataVersionBadTimestamp() {
+    // length so that array reads do not error
+    byte[] bytes = new byte[100];
+    assertEquals(0, VersionedPropCodec.getDataVersion(bytes));
+  }
+
+  @Test
+  public void goPath() throws IOException {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);

Review comment:
       ```suggestion
       var vProps = new VersionedProperties(aVersion, Instant.now(), 
Map.of("k1", "v1"));
   ```

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();
+      cos.close();
+
+      payload = bos.toByteArray();
+
+      log.debug("Output: {}", payload);
+
+    }
+
+    cipher.init(Cipher.DECRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(payload)) {
+
+      // write the property map keys, values.
+      try (CipherInputStream cis = new CipherInputStream(bis, cipher);
+
+          DataInputStream cdis = new DataInputStream(cis)) {
+
+        assertEquals("A", cdis.readUTF());
+        assertEquals("B", cdis.readUTF());
+        assertEquals("C", cdis.readUTF());
+      }
+    }
+  }
+
+  /**
+   * Validate versioning with something other than default.
+   */
+  @Test
+  public void roundTripEncryption() throws Exception {
+
+    int aVersion = 13;
+    Instant now = Instant.now();
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");
+
+    VersionedProperties vProps = new VersionedProperties(aVersion, now, p);

Review comment:
       ```suggestion
       var vProps = new VersionedProperties(aVersion, Instant.now(), 
Map.of("k1", "v1"));
   ```

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/VersionedPropCodec.java
##########
@@ -0,0 +1,285 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static 
org.apache.accumulo.server.conf.codec.VersionedProperties.tsFormatter;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Instant;
+import java.time.format.DateTimeParseException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Abstract class to provide encoding / decoding of versioned properties. This 
class handles the
+ * serialization of the metadata and subclasses are required to implement
+ * {@link #encodePayload(OutputStream, VersionedProperties, EncodingOptions)} 
and
+ * {@link #decodePayload(InputStream, EncodingOptions)} to handle any specific 
implementation
+ * metadata (optional) and the property map according to the encoding scheme 
of the subclass.
+ * <p>
+ * The basic encoding format:
+ * <ul>
+ * <li>encoding metadata - specifies codec to be used</li>
+ * <li>version metadata - specifies property versioning information</li>
+ * <li>codec specific metadata (optional)</li>
+ * <li>the property map</li>
+ * </ul>
+ *
+ */
+public abstract class VersionedPropCodec {
+
+  private final EncodingOptions encodingOpts;
+
+  public VersionedPropCodec(final EncodingOptions encodingOpts) {
+    this.encodingOpts = encodingOpts;
+  }
+
+  /**
+   * The general encoding options that apply to all encodings.
+   *
+   * @return the general options.
+   */
+  public EncodingOptions getEncodingOpts() {
+    return encodingOpts;
+  }
+
+  /**
+   * Serialize the versioned properties. The version information on the 
properties is updated if the
+   * data is successfully serialized.
+   *
+   * @param vProps
+   *          the versioned properties.
+   * @return a byte array with the serialized properties.
+   */
+  public byte[] toBytes(final VersionedProperties vProps) throws IOException {
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream();
+        DataOutputStream dos = new DataOutputStream(bos)) {
+
+      // write encoding metadata
+      encodingOpts.encode(dos);
+
+      // write version metadata
+      DataVersionInfo vMetadata =
+          new DataVersionInfo(vProps.getNextVersion(), vProps.getTimestamp());
+      vMetadata.write(dos);
+
+      // delegate property encoding to sub-class
+      encodePayload(bos, vProps, encodingOpts);
+
+      return bos.toByteArray();
+    }
+  }
+
+  /**
+   * Encode the properties and optionally any specific encoding metadata that 
is necessary to decode
+   * the payload with the scheme chosen.
+   *
+   * @param out
+   *          an output stream
+   * @param vProps
+   *          the versioned properties
+   * @param encodingOpts
+   *          the general encoding options.
+   * @throws IOException
+   *           if an error occurs writing to the underlying output stream.
+   */
+  abstract void encodePayload(final OutputStream out, final 
VersionedProperties vProps,
+      final EncodingOptions encodingOpts) throws IOException;
+
+  public VersionedProperties fromBytes(final byte[] bytes) throws IOException {
+
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bis)) {
+
+      EncodingOptions encodingOpts = new EncodingOptions(dis);
+
+      if (!checkCanDecodeVersion(encodingOpts)) {
+        throw new IllegalArgumentException(
+            "Invalid data version - cannot process the version read: "
+                + encodingOpts.getEncodingVersion());
+      }
+
+      DataVersionInfo vMetadata = new DataVersionInfo(dis);
+
+      Map<String,String> props = decodePayload(bis, encodingOpts);
+
+      return new VersionedProperties(vMetadata.getDataVersion(), 
vMetadata.getTimestamp(), props);
+    }
+  }
+
+  abstract boolean checkCanDecodeVersion(final EncodingOptions encodingOpts);
+
+  /**
+   * Extracts the encoding version from the encoded byte array without fully 
decoding the payload.
+   * This is a convenience method if multiple encodings are present, and 
should only be required if
+   * upgrading / changing encodings, otherwise a single encoding should be in 
operation for an
+   * instance at any given time.
+   *
+   * @param bytes
+   *          serialized encoded versioned property byte array.
+   * @return the encoding version used to serialize the properties.
+   */
+  public static int getEncodingVersion(final byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bis)) {
+      return new EncodingOptions(dis).getEncodingVersion();
+    } catch (NullPointerException | IOException ex) {
+      throw new IllegalArgumentException("Failed to read encoding version from 
byte array provided",
+          ex);
+    }
+  }
+
+  /**
+   * Extracts the data version from the encoded byte array without fully 
decoding the payload.
+   * Normally the data version should be obtained from a fully decoded 
instance of the versioned
+   * properties.
+   * <p>
+   * The cost of reading the byte array from the backing store should be 
considered verses the
+   * additional cost of decoding - with a goal of reducing data reads from the 
store preferred.
+   * Generally reading from the store will be followed by some sort of usage 
which would require the
+   * full decode operation anyway.
+   *
+   * @param bytes
+   *          serialized encoded versioned property byte array.
+   * @return the encoding version used to serialize the properties.
+   */
+  public static int getDataVersion(final byte[] bytes) {
+    try (ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
+        DataInputStream dis = new DataInputStream(bis)) {
+      // skip encoding metadata
+      new EncodingOptions(dis);
+      return new DataVersionInfo(dis).getDataVersion();
+    } catch (NullPointerException | IOException ex) {
+      throw new IllegalArgumentException(
+          "Failed to read data version version from byte array provided", ex);
+    }
+  }
+
+  /**
+   * Decode the payload and any optional encoding specific metadata and return 
a map of the property
+   * name, value pairs.
+   *
+   * @param inStream
+   *          an input stream
+   * @param encodingOpts
+   *          the general encoding options.
+   * @return a map of properties name, value pairs.
+   * @throws IOException
+   *           if an exception occurs reading from the input stream.
+   */
+  abstract Map<String,String> decodePayload(final InputStream inStream,
+      final EncodingOptions encodingOpts) throws IOException;
+
+  /**
+   * Read the property map from a data input stream as UTF strings. The input 
stream should be
+   * created configured by sub-classes for the output of the sub-class. If the 
sub-class uses an
+   * encoding other that UTF strings, they should override this method. An 
example would be an
+   * encoding that uses JSON to encode the map.
+   * <p>
+   * The handling the properties as UTF strings is one implementation. 
Subclasses can implement
+   * different mechanism if desired, one example might be using a JSON 
implementation to encode /
+   * decode the properties.
+   *
+   * @param dis
+   *          a data input stream
+   * @return the property map
+   * @throws IOException
+   *           if an exception occurs reading from the stream.
+   */
+  Map<String,String> readMapAsUTF(DataInputStream dis) throws IOException {
+
+    Map<String,String> aMap = new HashMap<>();
+    int items = dis.readInt();
+
+    for (int i = 0; i < items; i++) {
+      String k = dis.readUTF();
+      String v = dis.readUTF();
+      aMap.put(k, v);
+    }
+    return aMap;
+  }
+
+  /**
+   * Write the property map to the data output stream. The underlying stream 
is not closed by this
+   * method.
+   * <p>
+   * The handling the properties as UTF strings is one implementation. 
Subclasses can implement
+   * different mechanism if desired, one example might be using a JSON 
implementation to encode /
+   * decode the properties.
+   *
+   * @param dos
+   *          a data output stream
+   * @param aMap
+   *          the property map of k, v string pairs.
+   * @throws IOException
+   *           if an exception occurs.
+   */
+  void writeMapAsUTF(final DataOutputStream dos, final Map<String,String> 
aMap) throws IOException {
+
+    dos.writeInt(aMap.size());
+
+    for (Map.Entry<String,String> e : aMap.entrySet()) {
+      dos.writeUTF(e.getKey());
+      dos.writeUTF(e.getValue());
+    }
+    dos.flush();
+  }
+
+  /**
+   * Helper class for reading / writing versioned properties metadata.
+   */
+  static class DataVersionInfo {
+    private final int dataVersion;
+    private final Instant timestamp;
+

Review comment:
       This helper class doesn't appear to be used. Is this for later or 
leftover from a previous iteration?

##########
File path: 
server/base/src/main/java/org/apache/accumulo/server/conf/codec/EncodingOptions.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.StringJoiner;
+
+/**
+ * Serialization metadata to allow for evolution of the encoding used for 
property storage. This
+ * info is expected to be stored first in the serialization and uncompressed 
so that the handling of
+ * subsequent fields and data can be processed correctly and without 
additional processing.
+ * <p>
+ * Instances of this class are immutable.
+ */
+public class EncodingOptions {
+
+  // Adding an encoding version must be done as an addition. Do not change or 
delete previous
+  // version numbers
+  public static final int EncodingVersion_1_0 = 1;
+  public static final int EXPERIMENTAL_CIPHER_ENCODING_1_0 = 999;

Review comment:
       The experimental cipher encoding is only used in test. I think it could 
be moved there.

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropGzipCodecTest.java
##########
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Exercise the {@link VersionedPropGzipCodec} class.
+ */
+public class VersionedPropGzipCodecTest {
+
+  private static final Logger log = 
LoggerFactory.getLogger(VersionedPropGzipCodecTest.class);
+
+  @Test
+  public void roundTripUncompressed() throws IOException {
+
+    Map<String,String> p = new HashMap<>();
+    p.put("k1", "v1");

Review comment:
       Can inline `Instant.now()` and use `Map.of()` to define these maps and 
inline them for easier readability (as I did in previous comments, but am too 
lazy to do for this class! :smiley_cat: )

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropCodecTest.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Test;
+
+/**
+ * Exercise the base class specific methods - most testing will occur in 
subclasses
+ */
+public class VersionedPropCodecTest {
+
+  @Test(expected = IllegalArgumentException.class)
+  public void invalidEncodingNullArray() {
+    VersionedPropCodec.getEncodingVersion(null);
+  }
+
+  @Test
+  public void validEncoding() {
+    // length so that array reads do not error
+    byte[] bytes = new byte[100];
+    int encodingVersion = VersionedPropCodec.getEncodingVersion(bytes);
+    assertEquals(0, encodingVersion);
+  }
+
+  /**
+   * The timestamp will be invalid - this should cause a timestamp parse error 
that will be remapped
+   * to an IllegalArgumentException.
+   */
+  @Test(expected = IllegalArgumentException.class)

Review comment:
       Could use `assertThrows()` to ensure the specific line throws the 
desired exception (can also optionally do checks on the resulting exception).

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodec.java
##########
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static 
org.apache.accumulo.server.conf.codec.EncodingOptions.EXPERIMENTAL_CIPHER_ENCODING_1_0;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.InvalidAlgorithmParameterException;
+import java.security.InvalidKeyException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.spec.InvalidKeySpecException;
+import java.security.spec.KeySpec;
+import java.util.Map;
+import java.util.zip.GZIPInputStream;
+import java.util.zip.GZIPOutputStream;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.NoSuchPaddingException;
+import javax.crypto.SecretKey;
+import javax.crypto.SecretKeyFactory;
+import javax.crypto.spec.GCMParameterSpec;
+import javax.crypto.spec.PBEKeySpec;
+import javax.crypto.spec.SecretKeySpec;
+
+/**
+ * EXPERIMENTAL - demonstrates using an alternate encoding scheme. The sample 
is completely
+ * functional, however, certain elements such as password / key handling may 
not be suitable for
+ * production. The encoding version is EXPERIMENTAL_CIPHER_ENCODING_1_0.
+ * <p>
+ * This codec uses AES algorithm in GCM mode for encryption to encode the 
property map that is
+ * stored in the external store.
+ */
+public class VersionedPropEncryptCodec extends VersionedPropCodec {

Review comment:
       I'd keep this in test. The experimental annotation is only useful on 
public API, and really then, only to generate docs. There's no value in making 
it more visible to expand its scope. Encrypting the properties in ZK is a neat 
idea, but quite a bit out of scope of the main work to improve property storage 
in ZK, and I think it's better if we keep this stuff in test for now.

##########
File path: 
server/base/src/test/java/org/apache/accumulo/server/conf/codec/VersionedPropEncryptCodecTest.java
##########
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.accumulo.server.conf.codec;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.security.NoSuchAlgorithmException;
+import java.time.Instant;
+import java.util.Base64;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.crypto.Cipher;
+import javax.crypto.CipherInputStream;
+import javax.crypto.CipherOutputStream;
+import javax.crypto.KeyGenerator;
+import javax.crypto.SecretKey;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class VersionedPropEncryptCodecTest {
+
+  private final Logger log = 
LoggerFactory.getLogger(VersionedPropEncryptCodecTest.class);
+
+  /**
+   * Perform a round trip - encode, decode set of operations.
+   *
+   * @throws Exception
+   *           an exception is a test failure.
+   */
+  @Test
+  public void roundTripSample() throws Exception {
+
+    // set-up sample "secret" key - for testing only.
+    final char[] pass = {'a', 'b', 'c'};
+    final byte[] salt = {1, 2, 3};
+
+    VersionedPropEncryptCodec.GCMCipherParams cipherProps =
+        new VersionedPropEncryptCodec.GCMCipherParams(pass, salt);
+
+    Cipher cipher = Cipher.getInstance("AES/GCM/NoPadding");
+    cipher.init(Cipher.ENCRYPT_MODE, cipherProps.getSecretKey(), 
cipherProps.getParameterSpec());
+
+    byte[] payload;
+
+    try (ByteArrayOutputStream bos = new ByteArrayOutputStream()) {
+
+      CipherOutputStream cos = new CipherOutputStream(bos, cipher);
+
+      DataOutputStream dos = new DataOutputStream(cos);
+
+      dos.writeUTF("A");
+      dos.writeUTF("B");
+      dos.writeUTF("C");
+
+      cos.flush();

Review comment:
       Redundant flush (close on next line implies flush):
   ```suggestion
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to