This is an automated email from the ASF dual-hosted git repository.

sammichen pushed a commit to branch ozone-0.6.0
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git

commit d97fa2823ea2de380a68dc83e050641cde4a258a
Author: Istvan Fajth <[email protected]>
AuthorDate: Tue Jul 14 23:30:45 2020 +0200

    HDDS-3925. SCM Pipeline DB should directly use UUID bytes for key rather 
than rely on proto serialization for key. (#1197)
    
    (cherry picked from commit 0a1cce5b98c22f72b31ca13a8ef2734f86b0bec4)
---
 .../hadoop/hdds/utils/db/RDBStoreIterator.java     |  16 ++
 .../org/apache/hadoop/hdds/utils/db/RDBTable.java  |   2 +-
 .../apache/hadoop/hdds/utils/db/TableIterator.java |   8 +
 .../apache/hadoop/hdds/utils/db/TypedTable.java    |   5 +
 .../hadoop/hdds/utils/db/TestRDBStoreIterator.java | 224 +++++++++++++++++++++
 .../hadoop/hdds/scm/metadata/PipelineIDCodec.java  |  38 +++-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  54 ++++-
 .../hdds/scm/metadata/TestPipelineIDCodec.java     | 144 +++++++++++++
 .../hdds/scm/pipeline/TestSCMPipelineManager.java  | 115 +++++++++++
 9 files changed, 602 insertions(+), 4 deletions(-)

diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
index 784738b..5902486 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java
@@ -32,12 +32,18 @@ public class RDBStoreIterator
     implements TableIterator<byte[], ByteArrayKeyValue> {
 
   private RocksIterator rocksDBIterator;
+  private RDBTable rocksDBTable;
 
   public RDBStoreIterator(RocksIterator iterator) {
     this.rocksDBIterator = iterator;
     rocksDBIterator.seekToFirst();
   }
 
+  public RDBStoreIterator(RocksIterator iterator, RDBTable table) {
+    this(iterator);
+    this.rocksDBTable = table;
+  }
+
   @Override
   public void forEachRemaining(
       Consumer<? super ByteArrayKeyValue> action) {
@@ -101,6 +107,16 @@ public class RDBStoreIterator
   }
 
   @Override
+  public void removeFromDB() throws IOException {
+    if (rocksDBTable == null) {
+      throw new UnsupportedOperationException("remove");
+    }
+    if (rocksDBIterator.isValid()) {
+      rocksDBTable.delete(rocksDBIterator.key());
+    }
+  }
+
+  @Override
   public void close() throws IOException {
     rocksDBIterator.close();
   }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
index 2e390e2..4dbb59a 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBTable.java
@@ -206,7 +206,7 @@ class RDBTable implements Table<byte[], byte[]> {
   public TableIterator<byte[], ByteArrayKeyValue> iterator() {
     ReadOptions readOptions = new ReadOptions();
     readOptions.setFillCache(false);
-    return new RDBStoreIterator(db.newIterator(handle, readOptions));
+    return new RDBStoreIterator(db.newIterator(handle, readOptions), this);
   }
 
   @Override
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java
index a684157..c9bc045 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TableIterator.java
@@ -60,4 +60,12 @@ public interface TableIterator<KEY, T> extends Iterator<T>, 
Closeable {
    */
   T value();
 
+  /**
+   * Remove the actual value of the iterator from the database table on
+   * which the iterator is working on.
+   *
+   * @throws IOException when there is an error occured during deletion.
+   */
+  void removeFromDB() throws IOException;
+
 }
diff --git 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
index 86d23af..1451946 100644
--- 
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
+++ 
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/TypedTable.java
@@ -420,5 +420,10 @@ public class TypedTable<KEY, VALUE> implements Table<KEY, 
VALUE> {
       return new TypedKeyValue(rawIterator.next(), keyType,
           valueType);
     }
+
+    @Override
+    public void removeFromDB() throws IOException {
+      rawIterator.removeFromDB();
+    }
   }
 }
diff --git 
a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
new file mode 100644
index 0000000..6e85977
--- /dev/null
+++ 
b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+import org.rocksdb.RocksIterator;
+
+import java.util.NoSuchElementException;
+import java.util.function.Consumer;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * This test prescribe expected behaviour from the RDBStoreIterator which wraps
+ * RocksDB's own iterator. Ozone internally in TypedTableIterator uses, the
+ * RDBStoreIterator to provide iteration over table elements in a typed manner.
+ * The tests are to ensure we access RocksDB via the iterator properly.
+ */
+public class TestRDBStoreIterator {
+
+  private RocksIterator rocksDBIteratorMock;
+  private RDBTable rocksTableMock;
+
+  @Before
+  public void setup() {
+    rocksDBIteratorMock = mock(RocksIterator.class);
+    rocksTableMock = mock(RDBTable.class);
+  }
+
+  @Test
+  public void testForeachRemainingCallsConsumerWithAllElements() {
+    when(rocksDBIteratorMock.isValid())
+        .thenReturn(true, true, true, true, true, true, false);
+    when(rocksDBIteratorMock.key())
+        .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02})
+        .thenThrow(new NoSuchElementException());
+    when(rocksDBIteratorMock.value())
+        .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d})
+        .thenThrow(new NoSuchElementException());
+
+
+    Consumer<ByteArrayKeyValue> consumerStub = mock(Consumer.class);
+
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    iter.forEachRemaining(consumerStub);
+
+    ArgumentCaptor<ByteArrayKeyValue> capture =
+        ArgumentCaptor.forClass(ByteArrayKeyValue.class);
+    verify(consumerStub, times(3)).accept(capture.capture());
+    assertArrayEquals(
+        new byte[]{0x00}, capture.getAllValues().get(0).getKey());
+    assertArrayEquals(
+        new byte[]{0x7f}, capture.getAllValues().get(0).getValue());
+    assertArrayEquals(
+        new byte[]{0x01}, capture.getAllValues().get(1).getKey());
+    assertArrayEquals(
+        new byte[]{0x7e}, capture.getAllValues().get(1).getValue());
+    assertArrayEquals(
+        new byte[]{0x02}, capture.getAllValues().get(2).getKey());
+    assertArrayEquals(
+        new byte[]{0x7d}, capture.getAllValues().get(2).getValue());
+  }
+
+  @Test
+  public void testHasNextDependsOnIsvalid(){
+    when(rocksDBIteratorMock.isValid()).thenReturn(true, false);
+
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+
+    assertTrue(iter.hasNext());
+    assertFalse(iter.hasNext());
+  }
+
+  @Test
+  public void testNextCallsIsValidThenGetsTheValueAndStepsToNext() {
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+
+    InOrder verifier = inOrder(rocksDBIteratorMock);
+
+    iter.next();
+
+    verifier.verify(rocksDBIteratorMock).isValid();
+    verifier.verify(rocksDBIteratorMock).key();
+    verifier.verify(rocksDBIteratorMock).value();
+    verifier.verify(rocksDBIteratorMock).next();
+  }
+
+  @Test
+  public void testConstructorSeeksToFirstElement() {
+    new RDBStoreIterator(rocksDBIteratorMock);
+
+    verify(rocksDBIteratorMock, times(1)).seekToFirst();
+  }
+
+  @Test
+  public void testSeekToFirstSeeks() {
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+
+    iter.seekToFirst();
+
+    verify(rocksDBIteratorMock, times(2)).seekToFirst();
+  }
+
+  @Test
+  public void testSeekToLastSeeks() {
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+
+    iter.seekToLast();
+
+    verify(rocksDBIteratorMock, times(1)).seekToLast();
+  }
+
+  @Test
+  public void testSeekReturnsTheActualKey() {
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
+    when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
+
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    ByteArrayKeyValue val = iter.seek(new byte[]{0x55});
+
+    InOrder verifier = inOrder(rocksDBIteratorMock);
+
+    verify(rocksDBIteratorMock, times(1)).seekToFirst(); //at construct time
+    verify(rocksDBIteratorMock, never()).seekToLast();
+    verifier.verify(rocksDBIteratorMock, times(1)).seek(any(byte[].class));
+    verifier.verify(rocksDBIteratorMock, times(1)).isValid();
+    verifier.verify(rocksDBIteratorMock, times(1)).key();
+    verifier.verify(rocksDBIteratorMock, times(1)).value();
+    assertArrayEquals(new byte[]{0x00}, val.getKey());
+    assertArrayEquals(new byte[]{0x7f}, val.getValue());
+  }
+
+  @Test
+  public void testGettingTheKeyIfIteratorIsValid() {
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
+
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    byte[] key = iter.key();
+
+    InOrder verifier = inOrder(rocksDBIteratorMock);
+
+    verifier.verify(rocksDBIteratorMock, times(1)).isValid();
+    verifier.verify(rocksDBIteratorMock, times(1)).key();
+    assertArrayEquals(new byte[]{0x00}, key);
+  }
+
+  @Test
+  public void testGettingTheValueIfIteratorIsValid() {
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    when(rocksDBIteratorMock.key()).thenReturn(new byte[]{0x00});
+    when(rocksDBIteratorMock.value()).thenReturn(new byte[]{0x7f});
+
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    ByteArrayKeyValue val = iter.value();
+
+    InOrder verifier = inOrder(rocksDBIteratorMock);
+
+    verifier.verify(rocksDBIteratorMock, times(1)).isValid();
+    verifier.verify(rocksDBIteratorMock, times(1)).key();
+    assertArrayEquals(new byte[]{0x00}, val.getKey());
+    assertArrayEquals(new byte[]{0x7f}, val.getValue());
+  }
+
+  @Test
+  public void testRemovingFromDBActuallyDeletesFromTable() throws Exception {
+    byte[] testKey = new byte[]{0x00};
+    when(rocksDBIteratorMock.isValid()).thenReturn(true);
+    when(rocksDBIteratorMock.key()).thenReturn(testKey);
+
+    RDBStoreIterator iter =
+        new RDBStoreIterator(rocksDBIteratorMock, rocksTableMock);
+    iter.removeFromDB();
+
+    InOrder verifier = inOrder(rocksDBIteratorMock, rocksTableMock);
+
+    verifier.verify(rocksDBIteratorMock, times(1)).isValid();
+    verifier.verify(rocksTableMock, times(1)).delete(testKey);
+  }
+
+  @Test(expected = UnsupportedOperationException.class)
+  public void testRemoveFromDBWithoutDBTableSet() throws Exception {
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    iter.removeFromDB();
+  }
+
+  @Test
+  public void testCloseCloses() throws Exception {
+    RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock);
+    iter.close();
+
+    verify(rocksDBIteratorMock, times(1)).close();
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
index d661e34..e73539f 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
@@ -19,6 +19,9 @@
 package org.apache.hadoop.hdds.scm.metadata;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.UUID;
 
 import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
 import org.apache.hadoop.hdds.utils.db.Codec;
@@ -30,12 +33,43 @@ public class PipelineIDCodec implements Codec<PipelineID> {
 
   @Override
   public byte[] toPersistedFormat(PipelineID object) throws IOException {
-    return object.getProtobuf().toByteArray();
+    byte[] bytes = new byte[16];
+    System.arraycopy(
+        asByteArray(object.getId().getMostSignificantBits()), 0, bytes, 0, 8);
+    System.arraycopy(
+        asByteArray(object.getId().getLeastSignificantBits()), 0, bytes, 8, 8);
+    return bytes;
+  }
+
+  private byte[] asByteArray(long bits) {
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.putLong(bits);
+    return buffer.array();
   }
 
   @Override
   public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
-    return null;
+    long mostSiginificantBits = toLong(rawData, 0);
+    long leastSignificantBits = toLong(rawData, 8);
+
+    UUID id = new UUID(mostSiginificantBits, leastSignificantBits);
+    return PipelineID.valueOf(id);
+  }
+
+  private long toLong(byte[] arr, int startIdx) throws IOException {
+    if (arr.length < startIdx + 8) {
+      throw new IOException("Key conversion error.",
+          new ArrayIndexOutOfBoundsException(
+              "Key does not have the least expected amount of bytes,"
+                  + "and does not contain a UUID. Key: "
+                  + Arrays.toString(arr)
+          )
+      );
+    }
+    ByteBuffer buffer = ByteBuffer.allocate(Long.BYTES);
+    buffer.put(arr, startIdx, 8);
+    buffer.flip();
+    return buffer.getLong();
   }
 
   @Override
diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index e8223ca..fda9371 100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -161,12 +161,64 @@ public class SCMPipelineManager implements 
PipelineManager {
     TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
         iterator = pipelineStore.iterator();
     while (iterator.hasNext()) {
-      Pipeline pipeline = iterator.next().getValue();
+      Pipeline pipeline = nextPipelineFromIterator(iterator);
       stateManager.addPipeline(pipeline);
       nodeManager.addPipeline(pipeline);
     }
   }
 
+  private Pipeline nextPipelineFromIterator(
+      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it
+  ) throws IOException {
+    KeyValue<PipelineID, Pipeline> actual = it.next();
+    Pipeline pipeline = actual.getValue();
+    PipelineID pipelineID = actual.getKey();
+    checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
+    return pipeline;
+  }
+
+  /**
+   * This method is part of the change that happens in HDDS-3925, and we can
+   * and should remove this on later on.
+   * The purpose of the change is to get rid of protobuf serialization in the
+   * SCM database Pipeline table keys. The keys are not used anywhere, and the
+   * PipelineID that is used as a key is in the value as well, so we can detect
+   * a change in the key translation to byte[] and if we have the old format
+   * we refresh the table contents during SCM startup.
+   *
+   * If this fails in the remove, then there is an IOException coming from
+   * RocksDB itself, in this case in memory structures will still be fine and
+   * SCM should be operational, however we will attempt to replace the old key
+   * at next startup. In this case removing of the pipeline will leave the
+   * pipeline in RocksDB, and during next startup we will attempt to delete it
+   * again. This does not affect any runtime operations.
+   * If a Pipeline should have been deleted but remained in RocksDB, then at
+   * next startup it will be replaced and added with the new key, then SCM will
+   * detect that it is an invalid Pipeline and successfully delete it with the
+   * new key.
+   * For further info check the JIRA.
+   *
+   * @param it the iterator used to iterate the Pipeline table
+   * @param pipeline the pipeline read already from the iterator
+   * @param pipelineID the pipeline ID read from the raw data via the iterator
+   */
+  private void checkKeyAndReplaceIfObsolete(
+      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it,
+      Pipeline pipeline,
+      PipelineID pipelineID
+  ) {
+    if (!pipelineID.equals(pipeline.getId())) {
+      try {
+        it.removeFromDB();
+        pipelineStore.put(pipeline.getId(), pipeline);
+      } catch (IOException e) {
+        LOG.info("Pipeline table in RocksDB has an old key format, and "
+            + "removing the pipeline with the old key was unsuccessful."
+            + "Pipeline: {}", pipeline);
+      }
+    }
+  }
+
   private void recordMetricsForPipeline(Pipeline pipeline) {
     metrics.incNumPipelineAllocated();
     if (pipeline.isOpen()) {
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java
new file mode 100644
index 0000000..5543be5
--- /dev/null
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/metadata/TestPipelineIDCodec.java
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.metadata;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.junit.Test;
+
+import java.util.UUID;
+
+/**
+ * Testing serialization of PipelineID objects to/from RocksDB.
+ */
+public class TestPipelineIDCodec {
+
+  @Test
+  public void testPersistingZeroAsUUID() throws Exception {
+    long leastSigBits = 0x0000_0000_0000_0000L;
+    long mostSigBits = 0x0000_0000_0000_0000L;
+    byte[] expected = new byte[] {
+        b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00),
+        b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00), b(0x00)
+    };
+
+    checkPersisting(leastSigBits, mostSigBits, expected);
+  }
+
+  @Test
+  public void testPersistingFFAsUUID() throws Exception {
+    long leastSigBits = 0xFFFF_FFFF_FFFF_FFFFL;
+    long mostSigBits = 0xFFFF_FFFF_FFFF_FFFFL;
+    byte[] expected = new byte[] {
+        b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF),
+        b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF), b(0xFF)
+    };
+
+    checkPersisting(leastSigBits, mostSigBits, expected);
+  }
+
+  @Test
+  public void testPersistingARandomUUID() throws Exception {
+    for (int i=0; i<100; i++) {
+      UUID uuid = UUID.randomUUID();
+
+      long mask = 0x0000_0000_0000_00FFL;
+
+      byte[] expected = new byte[] {
+          b(((int) (uuid.getMostSignificantBits() >> 56 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 48 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 40 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 32 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 24 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 16 & mask))),
+          b(((int) (uuid.getMostSignificantBits() >> 8 & mask))),
+          b(((int) (uuid.getMostSignificantBits() & mask))),
+
+          b(((int) (uuid.getLeastSignificantBits() >> 56 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 48 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 40 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 32 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 24 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 16 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() >> 8 & mask))),
+          b(((int) (uuid.getLeastSignificantBits() & mask))),
+      };
+
+      checkPersisting(
+          uuid.getMostSignificantBits(),
+          uuid.getLeastSignificantBits(),
+          expected
+      );
+    }
+  }
+
+  @Test
+  public void testConvertAndReadBackZeroAsUUID() throws Exception {
+    long mostSigBits = 0x0000_0000_0000_0000L;
+    long leastSigBits = 0x0000_0000_0000_0000L;
+    UUID uuid = new UUID(mostSigBits, leastSigBits);
+    PipelineID pid = PipelineID.valueOf(uuid);
+
+    byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid);
+    PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded);
+
+    assertEquals(pid, decoded);
+  }
+
+  @Test
+  public void testConvertAndReadBackFFAsUUID() throws Exception {
+    long mostSigBits = 0xFFFF_FFFF_FFFF_FFFFL;
+    long leastSigBits = 0xFFFF_FFFF_FFFF_FFFFL;
+    UUID uuid = new UUID(mostSigBits, leastSigBits);
+    PipelineID pid = PipelineID.valueOf(uuid);
+
+    byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid);
+    PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded);
+
+    assertEquals(pid, decoded);
+  }
+
+  @Test
+  public void testConvertAndReadBackRandomUUID() throws Exception {
+    UUID uuid = UUID.randomUUID();
+    PipelineID pid = PipelineID.valueOf(uuid);
+
+    byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid);
+    PipelineID decoded = new PipelineIDCodec().fromPersistedFormat(encoded);
+
+    assertEquals(pid, decoded);
+  }
+
+  private void checkPersisting(
+      long mostSigBits, long leastSigBits, byte[] expected
+  ) throws Exception {
+    UUID uuid = new UUID(mostSigBits, leastSigBits);
+    PipelineID pid = PipelineID.valueOf(uuid);
+
+    byte[] encoded = new PipelineIDCodec().toPersistedFormat(pid);
+
+    assertArrayEquals(expected, encoded);
+  }
+
+  private byte b(int i) {
+    return (byte) (i & 0x0000_00FF);
+  }
+}
diff --git 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 7c2f17e..fc8f61a 100644
--- 
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -21,8 +21,10 @@ package org.apache.hadoop.hdds.scm.pipeline;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
+import java.util.NoSuchElementException;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
@@ -37,11 +39,15 @@ import org.apache.hadoop.hdds.scm.TestUtils;
 import org.apache.hadoop.hdds.scm.container.ContainerID;
 import org.apache.hadoop.hdds.scm.container.MockNodeManager;
 import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
 import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
 import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
 import 
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
 import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
 import org.apache.hadoop.test.GenericTestUtils;
 
@@ -56,7 +62,14 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 import org.junit.Before;
 import org.junit.Test;
+import org.mockito.InOrder;
 
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Test cases to verify PipelineManager.
@@ -539,6 +552,108 @@ public class TestSCMPipelineManager {
     pipelineManager.close();
   }
 
+  /**
+   * This test was created for HDDS-3925 to check whether the db handling is
+   * proper at the SCMPipelineManager level. We should remove this test
+   * when we remove the key swap from the SCMPipelineManager code.
+   *
+   * The test emulates internally the values that the iterator will provide
+   * back to the check-fix code path. The iterator internally deserialize the
+   * key stored in RocksDB using the PipelineIDCodec. The older version of the
+   * codec serialized the PipelineIDs by taking the byte[] representation of
+   * the protobuf representation of the PipelineID, and deserialization was not
+   * implemented.
+   *
+   * In order to be able to check and fix the change, the deserialization was
+   * introduced, and deserialisation of the old protobuf byte representation
+   * with the new deserialization logic of the keys are
+   * checked against the PipelineID serialized in the value as well via
+   * protobuf.
+   * The DB is storing the keys now based on a byte[] serialized from the UUID
+   * inside the PipelineID.
+   * For this we emulate the getKey of the KeyValue returned by the
+   * iterator to return a PipelineID that is deserialized from the byte[]
+   * representation of the protobuf representation of the PipelineID in the
+   * test, as that would be the value we get from the iterator when iterating
+   * through a table with the old key format.
+   *
+   * @throws Exception when something goes wrong
+   */
+  @Test
+  public void testPipelineDBKeyFormatChange() throws Exception {
+    Pipeline p1 = pipelineStub();
+    Pipeline p2 = pipelineStub();
+    Pipeline p3 = pipelineStub();
+
+    TableIterator<PipelineID, KeyValue<PipelineID, Pipeline>> iteratorMock =
+        mock(TableIterator.class);
+
+    KeyValue<PipelineID, Pipeline> kv1 =
+        mockKeyValueToProvideOldKeyFormat(p1);
+    KeyValue<PipelineID, Pipeline> kv2 =
+        mockKeyValueToProvideNormalFormat(p2);
+    KeyValue<PipelineID, Pipeline> kv3 =
+        mockKeyValueToProvideOldKeyFormat(p3);
+
+    when(iteratorMock.next())
+        .thenReturn(kv1, kv2, kv3)
+        .thenThrow(new NoSuchElementException());
+    when(iteratorMock.hasNext())
+        .thenReturn(true, true, true, false);
+
+    Table<PipelineID, Pipeline> pipelineStore = mock(Table.class);
+    doReturn(iteratorMock).when(pipelineStore).iterator();
+    when(pipelineStore.isEmpty()).thenReturn(false);
+
+    InOrder inorderVerifier = inOrder(pipelineStore, iteratorMock);
+
+    new SCMPipelineManager(conf, nodeManager, pipelineStore, new EventQueue());
+
+    inorderVerifier.verify(iteratorMock).removeFromDB();
+    inorderVerifier.verify(pipelineStore).put(p1.getId(), p1);
+    inorderVerifier.verify(iteratorMock).removeFromDB();
+    inorderVerifier.verify(pipelineStore).put(p3.getId(), p3);
+
+    verify(pipelineStore, never()).put(p2.getId(), p2);
+  }
+
+  private Pipeline pipelineStub() {
+    return Pipeline.newBuilder()
+        .setId(PipelineID.randomId())
+        .setType(HddsProtos.ReplicationType.RATIS)
+        .setFactor(HddsProtos.ReplicationFactor.ONE)
+        .setState(Pipeline.PipelineState.OPEN)
+        .setNodes(
+            Arrays.asList(
+                nodeManager.getNodes(HddsProtos.NodeState.HEALTHY).get(0)
+            )
+        )
+        .setNodesInOrder(Arrays.asList(0))
+        .build();
+  }
+
+  private KeyValue<PipelineID, Pipeline>
+      mockKeyValueToProvideOldKeyFormat(Pipeline pipeline)
+      throws IOException {
+    KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
+    when(kv.getValue()).thenReturn(pipeline);
+    when(kv.getKey())
+        .thenReturn(
+            new PipelineIDCodec().fromPersistedFormat(
+                pipeline.getId().getProtobuf().toByteArray()
+            ));
+    return kv;
+  }
+
+  private KeyValue<PipelineID, Pipeline>
+      mockKeyValueToProvideNormalFormat(Pipeline pipeline)
+      throws IOException {
+    KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
+    when(kv.getValue()).thenReturn(pipeline);
+    when(kv.getKey()).thenReturn(pipeline.getId());
+    return kv;
+  }
+
   private void sendPipelineReport(DatanodeDetails dn,
       Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
       boolean isLeader, EventQueue eventQueue) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to