This is an automated email from the ASF dual-hosted git repository. sammichen pushed a commit to branch ozone-0.6.0 in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
commit 6cdfc7d64755cfdf601d9da85493fecde5ed3e46 Author: avijayanhwx <[email protected]> AuthorDate: Fri Jul 17 11:51:44 2020 -0700 HDDS-3965. SCM failed to start up for duplicated pipeline detected. (#1210) (cherry picked from commit ca4c5a154bfda6c176775138f809b17e6af1d77e) --- .../hadoop/hdds/utils/db/RDBStoreIterator.java | 40 +++-- .../hadoop/hdds/utils/db/TestRDBStoreIterator.java | 10 +- .../hadoop/hdds/utils/db/TestRDBTableStore.java | 61 +++++++ .../hdds/scm/pipeline/SCMPipelineManager.java | 6 + .../hdds/scm/pipeline/TestSCMPipelineManager.java | 80 +++++++++ ...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 180 +++++++++++++++++++++ 6 files changed, 360 insertions(+), 17 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java index 5902486..ffe5f96 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/RDBStoreIterator.java @@ -24,6 +24,8 @@ import java.util.NoSuchElementException; import java.util.function.Consumer; import org.rocksdb.RocksIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * RocksDB store iterator. @@ -31,12 +33,16 @@ import org.rocksdb.RocksIterator; public class RDBStoreIterator implements TableIterator<byte[], ByteArrayKeyValue> { + private static final Logger LOG = + LoggerFactory.getLogger(RDBStoreIterator.class); + private RocksIterator rocksDBIterator; private RDBTable rocksDBTable; + private ByteArrayKeyValue currentEntry; public RDBStoreIterator(RocksIterator iterator) { this.rocksDBIterator = iterator; - rocksDBIterator.seekToFirst(); + seekToFirst(); } public RDBStoreIterator(RocksIterator iterator, RDBTable table) { @@ -52,6 +58,15 @@ public class RDBStoreIterator } } + private void setCurrentEntry() { + if (rocksDBIterator.isValid()) { + currentEntry = ByteArrayKeyValue.create(rocksDBIterator.key(), + rocksDBIterator.value()); + } else { + currentEntry = null; + } + } + @Override public boolean hasNext() { return rocksDBIterator.isValid(); @@ -59,12 +74,10 @@ public class RDBStoreIterator @Override public ByteArrayKeyValue next() { - if (rocksDBIterator.isValid()) { - ByteArrayKeyValue value = - ByteArrayKeyValue.create(rocksDBIterator.key(), rocksDBIterator - .value()); + setCurrentEntry(); + if (currentEntry != null) { rocksDBIterator.next(); - return value; + return currentEntry; } throw new NoSuchElementException("RocksDB Store has no more elements"); } @@ -72,21 +85,20 @@ public class RDBStoreIterator @Override public void seekToFirst() { rocksDBIterator.seekToFirst(); + setCurrentEntry(); } @Override public void seekToLast() { rocksDBIterator.seekToLast(); + setCurrentEntry(); } @Override public ByteArrayKeyValue seek(byte[] key) { rocksDBIterator.seek(key); - if (rocksDBIterator.isValid()) { - return ByteArrayKeyValue.create(rocksDBIterator.key(), - rocksDBIterator.value()); - } - return null; + setCurrentEntry(); + return currentEntry; } @Override @@ -111,8 +123,10 @@ public class RDBStoreIterator if (rocksDBTable == null) { throw new UnsupportedOperationException("remove"); } - if (rocksDBIterator.isValid()) { - rocksDBTable.delete(rocksDBIterator.key()); + if (currentEntry != null) { + rocksDBTable.delete(currentEntry.getKey()); + } else { + LOG.info("Unable to delete currentEntry as it does not exist."); } } diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java index 6e85977..fcb7dd2 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBStoreIterator.java @@ -58,12 +58,14 @@ public class TestRDBStoreIterator { @Test public void testForeachRemainingCallsConsumerWithAllElements() { when(rocksDBIteratorMock.isValid()) - .thenReturn(true, true, true, true, true, true, false); + .thenReturn(true, true, true, true, true, true, true, false); when(rocksDBIteratorMock.key()) - .thenReturn(new byte[]{0x00}, new byte[]{0x01}, new byte[]{0x02}) + .thenReturn(new byte[]{0x00}, new byte[]{0x00}, new byte[]{0x01}, + new byte[]{0x02}) .thenThrow(new NoSuchElementException()); when(rocksDBIteratorMock.value()) - .thenReturn(new byte[]{0x7f}, new byte[]{0x7e}, new byte[]{0x7d}) + .thenReturn(new byte[]{0x7f}, new byte[]{0x7f}, new byte[]{0x7e}, + new byte[]{0x7d}) .thenThrow(new NoSuchElementException()); @@ -91,7 +93,7 @@ public class TestRDBStoreIterator { @Test public void testHasNextDependsOnIsvalid(){ - when(rocksDBIteratorMock.isValid()).thenReturn(true, false); + when(rocksDBIteratorMock.isValid()).thenReturn(true, true, false); RDBStoreIterator iter = new RDBStoreIterator(rocksDBIteratorMock); diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java index 00d05a1..5d00763 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/utils/db/TestRDBTableStore.java @@ -364,4 +364,65 @@ public class TestRDBTableStore { Assert.assertTrue(keyCount > 0 && keyCount <= numKeys); } } + + @Test + public void testIteratorRemoveFromDB() throws Exception { + + // Remove without next removes first entry. + try (Table<byte[], byte[]> testTable = rdbStore.getTable("Fifth")) { + writeToTable(testTable, 3); + TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator = + testTable.iterator(); + iterator.removeFromDB(); + Assert.assertNull(testTable.get("1".getBytes(StandardCharsets.UTF_8))); + Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8))); + Assert.assertNotNull(testTable.get("3".getBytes(StandardCharsets.UTF_8))); + } + + // Remove after seekToLast removes lastEntry + try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) { + writeToTable(testTable, 3); + TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator = + testTable.iterator(); + iterator.seekToLast(); + iterator.removeFromDB(); + Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8))); + Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8))); + Assert.assertNull(testTable.get("3".getBytes(StandardCharsets.UTF_8))); + } + + // Remove after seek deletes that entry. + try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) { + writeToTable(testTable, 3); + TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator = + testTable.iterator(); + iterator.seek("3".getBytes(StandardCharsets.UTF_8)); + iterator.removeFromDB(); + Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8))); + Assert.assertNotNull(testTable.get("2".getBytes(StandardCharsets.UTF_8))); + Assert.assertNull(testTable.get("3".getBytes(StandardCharsets.UTF_8))); + } + + // Remove after next() deletes entry that was returned by next. + try (Table<byte[], byte[]> testTable = rdbStore.getTable("Sixth")) { + writeToTable(testTable, 3); + TableIterator<byte[], ? extends Table.KeyValue<byte[], byte[]>> iterator = + testTable.iterator(); + iterator.seek("2".getBytes(StandardCharsets.UTF_8)); + iterator.next(); + iterator.removeFromDB(); + Assert.assertNotNull(testTable.get("1".getBytes(StandardCharsets.UTF_8))); + Assert.assertNull(testTable.get("2".getBytes(StandardCharsets.UTF_8))); + Assert.assertNotNull(testTable.get("3".getBytes(StandardCharsets.UTF_8))); + } + } + + private void writeToTable(Table testTable, int num) throws IOException { + for (int i = 1; i <= num; i++) { + byte[] key = (i + "").getBytes(StandardCharsets.UTF_8); + byte[] value = + RandomStringUtils.random(10).getBytes(StandardCharsets.UTF_8); + testTable.put(key, value); + } + } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index fda9371..6fce895 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@ -209,6 +209,7 @@ public class SCMPipelineManager implements PipelineManager { ) { if (!pipelineID.equals(pipeline.getId())) { try { + LOG.info("Found pipeline in old format key : {}", pipeline.getId()); it.removeFromDB(); pipelineStore.put(pipeline.getId(), pipeline); } catch (IOException e) { @@ -701,4 +702,9 @@ public class SCMPipelineManager implements PipelineManager { startPipelineCreator(); } } + + @VisibleForTesting + protected static Logger getLog() { + return LOG; + } } diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java index fc8f61a..62289b9 100644 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java @@ -22,10 +22,13 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.NoSuchElementException; import java.util.Set; +import java.util.UUID; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -56,12 +59,15 @@ import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_L import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + +import org.apache.hadoop.test.GenericTestUtils.LogCapturer; import org.junit.After; import org.junit.Assert; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; import org.junit.Before; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.mockito.InOrder; import static org.mockito.Mockito.doReturn; @@ -617,6 +623,80 @@ public class TestSCMPipelineManager { verify(pipelineStore, never()).put(p2.getId(), p2); } + @Test + public void testScmWithPipelineDBKeyFormatChange() throws Exception { + TemporaryFolder tempDir = new TemporaryFolder(); + tempDir.create(); + File dir = tempDir.newFolder(); + conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.getAbsolutePath()); + + SCMMetadataStore scmDbWithOldKeyFormat = null; + Map<UUID, Pipeline> oldPipelines = new HashMap<>(); + try { + scmDbWithOldKeyFormat = + new TestSCMStoreImplWithOldPipelineIDKeyFormat(conf); + // Create 3 pipelines. + for (int i = 0; i < 3; i++) { + Pipeline pipeline = pipelineStub(); + scmDbWithOldKeyFormat.getPipelineTable() + .put(pipeline.getId(), pipeline); + oldPipelines.put(pipeline.getId().getId(), pipeline); + } + } finally { + if (scmDbWithOldKeyFormat != null) { + scmDbWithOldKeyFormat.stop(); + } + } + + LogCapturer logCapturer = + LogCapturer.captureLogs(SCMPipelineManager.getLog()); + + // Create SCMPipelineManager with new DBDefinition. + SCMMetadataStore newScmMetadataStore = null; + try { + newScmMetadataStore = new SCMMetadataStoreImpl(conf); + SCMPipelineManager pipelineManager = new SCMPipelineManager(conf, + nodeManager, + newScmMetadataStore.getPipelineTable(), + new EventQueue()); + + waitForLog(logCapturer); + assertEquals(3, pipelineManager.getPipelines().size()); + oldPipelines.values().forEach(p -> + pipelineManager.containsPipeline(p.getId())); + } finally { + newScmMetadataStore.stop(); + } + + // Mimicking another restart. + try { + logCapturer.clearOutput(); + newScmMetadataStore = new SCMMetadataStoreImpl(conf); + SCMPipelineManager pipelineManager = new SCMPipelineManager(conf, + nodeManager, + newScmMetadataStore.getPipelineTable(), + new EventQueue()); + try { + waitForLog(logCapturer); + Assert.fail("Unexpected log: " + logCapturer.getOutput()); + } catch (TimeoutException ex) { + Assert.assertTrue(ex.getMessage().contains("Timed out")); + } + assertEquals(3, pipelineManager.getPipelines().size()); + oldPipelines.values().forEach(p -> + pipelineManager.containsPipeline(p.getId())); + } finally { + newScmMetadataStore.stop(); + } + } + + private static void waitForLog(LogCapturer logCapturer) + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> logCapturer.getOutput() + .contains("Found pipeline in old format key"), + 1000, 5000); + } + private Pipeline pipelineStub() { return Pipeline.newBuilder() .setId(PipelineID.randomId()) diff --git a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java new file mode 100644 index 0000000..a04ecea --- /dev/null +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES; + +import java.io.IOException; +import java.math.BigInteger; +import java.security.cert.X509Certificate; + +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.metadata.PipelineCodec; +import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore; +import org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore; +import org.apache.hadoop.hdds.utils.db.BatchOperationHandler; +import org.apache.hadoop.hdds.utils.db.Codec; +import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition; +import org.apache.hadoop.hdds.utils.db.DBDefinition; +import org.apache.hadoop.hdds.utils.db.DBStore; +import org.apache.hadoop.hdds.utils.db.DBStoreBuilder; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.TableIterator; + +/** + * Test SCM Metadata Store that has ONLY the pipeline table whose key uses the + * old codec format. + */ +public class TestSCMStoreImplWithOldPipelineIDKeyFormat + implements SCMMetadataStore { + + private DBStore store; + private final OzoneConfiguration configuration; + private Table<PipelineID, Pipeline> pipelineTable; + + public TestSCMStoreImplWithOldPipelineIDKeyFormat( + OzoneConfiguration config) throws IOException { + this.configuration = config; + start(configuration); + } + + @Override + public void start(OzoneConfiguration config) + throws IOException { + if (this.store == null) { + this.store = DBStoreBuilder.createDBStore(config, + new SCMDBTestDefinition()); + pipelineTable = PIPELINES.getTable(store); + } + } + + @Override + public void stop() throws Exception { + if (store != null) { + store.close(); + store = null; + } + } + + @Override + public DBStore getStore() { + return null; + } + + @Override + public Table<Long, DeletedBlocksTransaction> getDeletedBlocksTXTable() { + return null; + } + + @Override + public Long getCurrentTXID() { + return null; + } + + @Override + public Long getNextDeleteBlockTXID() { + return null; + } + + @Override + public Table<BigInteger, X509Certificate> getValidCertsTable() { + return null; + } + + @Override + public Table<BigInteger, X509Certificate> getRevokedCertsTable() { + return null; + } + + @Override + public TableIterator getAllCerts(CertificateStore.CertType certType) { + return null; + } + + @Override + public Table<PipelineID, Pipeline> getPipelineTable() { + return pipelineTable; + } + + @Override + public BatchOperationHandler getBatchHandler() { + return null; + } + + @Override + public Table<ContainerID, ContainerInfo> getContainerTable() { + return null; + } + + /** + * Test SCM DB Definition for the above class. + */ + public static class SCMDBTestDefinition implements DBDefinition { + + public static final DBColumnFamilyDefinition<PipelineID, Pipeline> + PIPELINES = + new DBColumnFamilyDefinition<>( + "pipelines", + PipelineID.class, + new OldPipelineIDCodec(), + Pipeline.class, + new PipelineCodec()); + + @Override + public String getName() { + return "scm.db"; + } + + @Override + public String getLocationConfigKey() { + return ScmConfigKeys.OZONE_SCM_DB_DIRS; + } + + @Override + public DBColumnFamilyDefinition[] getColumnFamilies() { + return new DBColumnFamilyDefinition[] {PIPELINES}; + } + } + + /** + * Old Pipeline ID codec that relies on protobuf serialization. + */ + public static class OldPipelineIDCodec implements Codec<PipelineID> { + @Override + public byte[] toPersistedFormat(PipelineID object) throws IOException { + return object.getProtobuf().toByteArray(); + } + + @Override + public PipelineID fromPersistedFormat(byte[] rawData) throws IOException { + return null; + } + + @Override + public PipelineID copyObject(PipelineID object) { + throw new UnsupportedOperationException(); + } + } + +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
