This is an automated email from the ASF dual-hosted git repository.
elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0466ade HDDS-3172. Use DBStore instead of MetadataStore in SCM
0466ade is described below
commit 0466ade7d71bb65466fc45f7b481bd2ff34bbbe8
Author: Elek Márton <[email protected]>
AuthorDate: Wed Apr 22 12:21:22 2020 +0200
HDDS-3172. Use DBStore instead of MetadataStore in SCM
Closes #700
---
.../java/org/apache/hadoop/ozone/OzoneConsts.java | 5 -
.../hdds/utils/db/BatchOperationHandler.java | 44 ++++++++
.../hdds/utils/db/DBColumnFamilyDefinition.java | 81 +++++++++++++++
.../apache/hadoop/hdds/utils/db/DBDefinition.java | 46 +++++++++
.../org/apache/hadoop/hdds/utils/db/DBStore.java | 18 +---
.../hadoop/hdds/utils/db/DBStoreBuilder.java | 54 +++++++++-
.../hdds/scm/block/SCMBlockDeletingService.java | 37 ++++---
.../hdds/scm/container/ContainerManager.java | 6 +-
.../hdds/scm/container/SCMContainerManager.java | 113 +++++++++------------
.../hadoop/hdds/scm/metadata/ContainerIDCodec.java | 48 +++++++++
.../hdds/scm/metadata/ContainerInfoCodec.java | 47 +++++++++
.../hadoop/hdds/scm/metadata/PipelineCodec.java | 56 ++++++++++
.../hadoop/hdds/scm/metadata/PipelineIDCodec.java | 45 ++++++++
.../hadoop/hdds/scm/metadata/SCMDBDefinition.java | 98 ++++++++++++++++++
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 27 ++++-
.../hdds/scm/metadata/SCMMetadataStoreRDBImpl.java | 113 +++++++++++----------
.../hdds/scm/pipeline/SCMPipelineManager.java | 85 +++++-----------
.../hdds/scm/server/StorageContainerManager.java | 10 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 32 ++++--
.../container/TestCloseContainerEventHandler.java | 15 ++-
.../scm/container/TestSCMContainerManager.java | 14 ++-
.../hdds/scm/node/TestContainerPlacement.java | 31 ++++--
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 54 +++++++---
.../safemode/TestHealthyPipelineSafeModeRule.java | 36 ++++---
.../TestOneReplicaPipelineSafeModeRule.java | 18 +++-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 58 +++++++++--
hadoop-ozone/dist/pom.xml | 2 +-
.../ozone/recon/scm/ReconContainerManager.java | 27 ++---
.../hadoop/ozone/recon/scm/ReconDBDefinition.java | 38 +++++++
.../ozone/recon/scm/ReconPipelineManager.java | 25 ++---
.../scm/ReconStorageContainerManagerFacade.java | 42 +++++---
.../scm/AbstractReconContainerManagerTest.java | 37 ++++---
.../ozone/recon/scm/TestReconPipelineManager.java | 37 ++++---
.../ozone/genesis/BenchMarkOzoneManager.java | 9 +-
.../apache/hadoop/ozone/genesis/BenchMarkSCM.java | 9 +-
.../apache/hadoop/ozone/genesis/GenesisUtil.java | 30 +++---
36 files changed, 1054 insertions(+), 393 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
index a058d76..a88acc2 100644
--- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConsts.java
@@ -114,14 +114,9 @@ public final class OzoneConsts {
*/
public static final String CONTAINER_DB_SUFFIX = "container.db";
public static final String PIPELINE_DB_SUFFIX = "pipeline.db";
- public static final String SCM_CONTAINER_DB = "scm-" + CONTAINER_DB_SUFFIX;
- public static final String SCM_PIPELINE_DB = "scm-" + PIPELINE_DB_SUFFIX;
public static final String DN_CONTAINER_DB = "-dn-"+ CONTAINER_DB_SUFFIX;
- public static final String DELETED_BLOCK_DB = "deletedBlock.db";
public static final String OM_DB_NAME = "om.db";
public static final String OM_DB_BACKUP_PREFIX = "om.db.backup.";
- public static final String OZONE_MANAGER_TOKEN_DB_NAME = "om-token.db";
- public static final String SCM_DB_NAME = "scm.db";
public static final String STORAGE_DIR_CHUNKS = "chunks";
public static final String OZONE_DB_CHECKPOINT_REQUEST_FLUSH =
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
new file mode 100644
index 0000000..eea483c
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/BatchOperationHandler.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+
+/**
+ * Create and commit batch operation for one DB.
+ */
+public interface BatchOperationHandler {
+
+ /**
+ * Initialize an atomic batch operation which can hold multiple PUT/DELETE
+ * operations and committed later in one step.
+ *
+ * @return BatchOperation holder which can be used to add or commit batch
+ * operations.
+ */
+ BatchOperation initBatchOperation();
+
+ /**
+ * Commit the batch operations.
+ *
+ * @param operation which contains all the required batch operation.
+ * @throws IOException on Failure.
+ */
+ void commitBatchOperation(BatchOperation operation) throws IOException;
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
new file mode 100644
index 0000000..e1c4163
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBColumnFamilyDefinition.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import java.io.IOException;
+
+/**
+ * Class represents one single column table with the required codecs and types.
+ *
+ * @param <KEY> the type of the key.
+ * @param <VALUE> they type of the value.
+ */
+public class DBColumnFamilyDefinition<KEY, VALUE> {
+
+ private final String tableName;
+
+ private final Class<KEY> keyType;
+
+ private final Codec<KEY> keyCodec;
+
+ private final Class<VALUE> valueType;
+
+ private final Codec<VALUE> valueCodec;
+
+ public DBColumnFamilyDefinition(
+ String tableName,
+ Class<KEY> keyType,
+ Codec<KEY> keyCodec,
+ Class<VALUE> valueType,
+ Codec<VALUE> valueCodec) {
+ this.tableName = tableName;
+ this.keyType = keyType;
+ this.keyCodec = keyCodec;
+ this.valueType = valueType;
+ this.valueCodec = valueCodec;
+ }
+
+ public Table<KEY, VALUE> getTable(DBStore db) throws IOException {
+ return db.getTable(tableName, keyType, valueType);
+ }
+
+ public String getName() {
+ return tableName;
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public Class<KEY> getKeyType() {
+ return keyType;
+ }
+
+ public Codec<KEY> getKeyCodec() {
+ return keyCodec;
+ }
+
+ public Class<VALUE> getValueType() {
+ return valueType;
+ }
+
+ public Codec<VALUE> getValueCodec() {
+ return valueCodec;
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
new file mode 100644
index 0000000..3058261
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBDefinition.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.utils.db;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Simple interface to provide information to create a DBStore..
+ */
+public interface DBDefinition {
+
+ Logger LOG = LoggerFactory.getLogger(DBDefinition.class);
+
+ /**
+ * Logical name of the DB.
+ */
+ String getName();
+
+ /**
+ * Configuration key defines the location of the DB.
+ */
+ String getLocationConfigKey();
+
+ /**
+ * Create a new DB store instance based on the configuration.
+ */
+ DBColumnFamilyDefinition[] getColumnFamilies();
+
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
index ed64b74..8567d03 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStore.java
@@ -35,7 +35,7 @@ import org.apache.hadoop.hdds.utils.db.cache.TableCacheImpl;
*
*/
@InterfaceStability.Evolving
-public interface DBStore extends AutoCloseable {
+public interface DBStore extends AutoCloseable, BatchOperationHandler {
/**
* Gets an existing TableStore.
@@ -141,22 +141,6 @@ public interface DBStore extends AutoCloseable {
*/
long getEstimatedKeyCount() throws IOException;
- /**
- * Initialize an atomic batch operation which can hold multiple PUT/DELETE
- * operations and committed later in one step.
- *
- * @return BatchOperation holder which can be used to add or commit batch
- * operations.
- */
- BatchOperation initBatchOperation();
-
- /**
- * Commit the batch operations.
- *
- * @param operation which contains all the required batch operation.
- * @throws IOException on Failure.
- */
- void commitBatchOperation(BatchOperation operation) throws IOException;
/**
* Get current snapshot of DB store as an artifact stored on
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
index 88d509a..2e18530 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/utils/db/DBStoreBuilder.java
@@ -29,12 +29,15 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Set;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.StringUtils;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import com.google.common.base.Preconditions;
import static org.apache.hadoop.hdds.HddsConfigKeys.HDDS_DB_PROFILE;
+import static org.apache.hadoop.hdds.server.ServerUtils.getDirectoryFromConfig;
+import static org.apache.hadoop.hdds.server.ServerUtils.getOzoneMetaDirPath;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
@@ -78,10 +81,11 @@ public final class DBStoreBuilder {
private String rocksDbStat;
private RocksDBConfiguration rocksDBConfiguration;
- private DBStoreBuilder(OzoneConfiguration configuration) {
+ private DBStoreBuilder(ConfigurationSource configuration) {
this(configuration, configuration.getObject(RocksDBConfiguration.class));
}
- private DBStoreBuilder(OzoneConfiguration configuration,
+
+ private DBStoreBuilder(ConfigurationSource configuration,
RocksDBConfiguration rocksDBConfiguration) {
tables = new HashSet<>();
tableNames = new LinkedList<>();
@@ -93,8 +97,7 @@ public final class DBStoreBuilder {
this.rocksDBConfiguration = rocksDBConfiguration;
}
-
- public static DBStoreBuilder newBuilder(OzoneConfiguration configuration) {
+ public static DBStoreBuilder newBuilder(ConfigurationSource configuration) {
return new DBStoreBuilder(configuration);
}
@@ -263,4 +266,45 @@ public final class DBStoreBuilder {
return Paths.get(dbPath.toString(), dbname).toFile();
}
+ private static DBStoreBuilder createDBStoreBuilder(
+ ConfigurationSource configuration, DBDefinition definition) {
+
+ File metadataDir = getDirectoryFromConfig(configuration,
+ definition.getLocationConfigKey(), definition.getName());
+
+ if (metadataDir == null) {
+
+ LOG.warn("{} is not configured. We recommend adding this setting. " +
+ "Falling back to {} instead.",
+ definition.getLocationConfigKey(),
+ HddsConfigKeys.OZONE_METADATA_DIRS);
+ metadataDir = getOzoneMetaDirPath(configuration);
+ }
+
+ return DBStoreBuilder.newBuilder(configuration)
+ .setName(definition.getName())
+ .setPath(Paths.get(metadataDir.getPath()));
+ }
+
+ /**
+ * Create DBStoreBuilder from a generic DBDefinition.
+ */
+ public static DBStore createDBStore(ConfigurationSource configuration,
+ DBDefinition definition)
+ throws IOException {
+ DBStoreBuilder builder = createDBStoreBuilder(configuration, definition);
+ for (DBColumnFamilyDefinition columnTableDefinition : definition
+ .getColumnFamilies()) {
+ builder.registerTable(columnTableDefinition);
+ }
+ return builder.build();
+ }
+
+ private <KEY, VALUE> void registerTable(
+ DBColumnFamilyDefinition<KEY, VALUE> definition) {
+ addTable(definition.getName())
+ .addCodec(definition.getKeyType(), definition.getKeyCodec())
+ .addCodec(definition.getValueType(), definition.getValueCodec());
+ }
+
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 5ca75d2..0980369 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -16,38 +16,35 @@
*/
package org.apache.hadoop.hdds.scm.block;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+
import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
-import org.apache.hadoop.util.Time;
import org.apache.hadoop.hdds.utils.BackgroundService;
import org.apache.hadoop.hdds.utils.BackgroundTask;
import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
import org.apache.hadoop.hdds.utils.BackgroundTaskResult.EmptyTaskResult;
+import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
+import static
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.IOException;
-import java.util.List;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL;
-import static org.apache.hadoop.ozone.OzoneConfigKeys
- .OZONE_BLOCK_DELETING_CONTAINER_LIMIT_PER_INTERVAL_DEFAULT;
-
/**
* A background service running in SCM to delete blocks. This service scans
* block deletion log in certain interval and caches block deletion commands
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
index 973026d..43c1ced 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManager.java
@@ -16,15 +16,15 @@
*/
package org.apache.hadoop.hdds.scm.container;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+
// TODO: Write extensive java doc.
// This is the main interface of ContainerManager.
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
index 38c3d11..9f47608 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/SCMContainerManager.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.hdds.scm.container;
-import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -32,7 +31,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
@@ -42,18 +40,13 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
-import org.apache.hadoop.hdds.server.ServerUtils;
-import org.apache.hadoop.hdds.utils.BatchOperation;
-import org.apache.hadoop.hdds.utils.MetadataStore;
-import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
-import com.google.common.primitives.Longs;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_CONTAINER_DB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -67,38 +60,40 @@ public class SCMContainerManager implements
ContainerManager {
SCMContainerManager.class);
private final Lock lock;
- private final MetadataStore containerStore;
+
private final PipelineManager pipelineManager;
+
private final ContainerStateManager containerStateManager;
+
private final int numContainerPerOwnerInPipeline;
private final SCMContainerManagerMetrics scmContainerManagerMetrics;
+ private Table<ContainerID, ContainerInfo> containerStore;
+
+ private BatchOperationHandler batchHandler;
+
/**
* Constructs a mapping class that creates mapping between container names
* and pipelines.
- *
+ * <p>
* passed to LevelDB and this memory is allocated in Native code space.
* CacheSize is specified
* in MB.
- * @param conf - {@link ConfigurationSource}
+ *
+ * @param conf - {@link ConfigurationSource}
* @param pipelineManager - {@link PipelineManager}
* @throws IOException on Failure.
*/
- public SCMContainerManager(final ConfigurationSource conf,
+ public SCMContainerManager(
+ final ConfigurationSource conf,
+ Table<ContainerID, ContainerInfo> containerStore,
+ BatchOperationHandler batchHandler,
PipelineManager pipelineManager)
throws IOException {
- final File containerDBPath = getContainerDBPath(conf);
- final int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
-
- this.containerStore = MetadataStoreBuilder.newBuilder()
- .setConf(conf)
- .setDbFile(containerDBPath)
- .setCacheSize(cacheSize * OzoneConsts.MB)
- .build();
-
+ this.batchHandler = batchHandler;
+ this.containerStore = containerStore;
this.lock = new ReentrantLock();
this.pipelineManager = pipelineManager;
this.containerStateManager = new ContainerStateManager(conf);
@@ -112,11 +107,12 @@ public class SCMContainerManager implements
ContainerManager {
}
private void loadExistingContainers() throws IOException {
- List<Map.Entry<byte[], byte[]>> range = containerStore
- .getSequentialRangeKVs(null, Integer.MAX_VALUE, null);
- for (Map.Entry<byte[], byte[]> entry : range) {
- ContainerInfo container = ContainerInfo.fromProtobuf(
- ContainerInfoProto.PARSER.parseFrom(entry.getValue()));
+
+ TableIterator<ContainerID, ? extends KeyValue<ContainerID, ContainerInfo>>
+ iterator = containerStore.iterator();
+
+ while (iterator.hasNext()) {
+ ContainerInfo container = iterator.next().getValue();
Preconditions.checkNotNull(container);
containerStateManager.loadContainer(container);
try {
@@ -304,10 +300,8 @@ public class SCMContainerManager implements
ContainerManager {
lock.lock();
try {
containerStateManager.removeContainer(containerID);
- final byte[] dbKey = Longs.toByteArray(containerID.getId());
- final byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes != null) {
- containerStore.delete(dbKey);
+ if (containerStore.get(containerID) != null) {
+ containerStore.delete(containerID);
} else {
// Where did the container go? o_O
LOG.warn("Unable to remove the container {} from container store," +
@@ -358,8 +352,7 @@ public class SCMContainerManager implements
ContainerManager {
containerID);
}
}
- final byte[] dbKey = Longs.toByteArray(containerID.getId());
- containerStore.put(dbKey, container.getProtobuf().toByteArray());
+ containerStore.put(containerID, container);
return newState;
} catch (ContainerNotFoundException cnfe) {
throw new SCMException(
@@ -372,38 +365,40 @@ public class SCMContainerManager implements
ContainerManager {
}
}
- /**
- * Update deleteTransactionId according to deleteTransactionMap.
- *
- * @param deleteTransactionMap Maps the containerId to latest delete
- * transaction id for the container.
- * @throws IOException
- */
+ /**
+ * Update deleteTransactionId according to deleteTransactionMap.
+ *
+ * @param deleteTransactionMap Maps the containerId to latest delete
+ * transaction id for the container.
+ * @throws IOException
+ */
public void updateDeleteTransactionId(Map<Long, Long> deleteTransactionMap)
throws IOException {
+
if (deleteTransactionMap == null) {
return;
}
-
+ org.apache.hadoop.hdds.utils.db.BatchOperation batchOperation =
+ batchHandler.initBatchOperation();
lock.lock();
try {
- BatchOperation batch = new BatchOperation();
for (Map.Entry<Long, Long> entry : deleteTransactionMap.entrySet()) {
long containerID = entry.getKey();
- byte[] dbKey = Longs.toByteArray(containerID);
- byte[] containerBytes = containerStore.get(dbKey);
- if (containerBytes == null) {
+
+ ContainerID containerIdObject = new ContainerID(containerID);
+ ContainerInfo containerInfo =
+ containerStore.get(containerIdObject);
+ if (containerInfo == null) {
throw new SCMException(
"Failed to increment number of deleted blocks for container "
+ containerID + ", reason : " + "container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
- ContainerInfo containerInfo = ContainerInfo.fromProtobuf(
- HddsProtos.ContainerInfoProto.parseFrom(containerBytes));
containerInfo.updateDeleteTransactionId(entry.getValue());
- batch.put(dbKey, containerInfo.getProtobuf().toByteArray());
+ containerStore
+ .putWithBatch(batchOperation, containerIdObject, containerInfo);
}
- containerStore.writeBatch(batch);
+ batchHandler.commitBatchOperation(batchOperation);
containerStateManager
.updateDeleteTransactionId(deleteTransactionMap);
} finally {
@@ -477,10 +472,8 @@ public class SCMContainerManager implements
ContainerManager {
protected void addContainerToDB(ContainerInfo containerInfo)
throws IOException {
try {
- final byte[] containerIDBytes = Longs.toByteArray(
- containerInfo.getContainerID());
- containerStore.put(containerIDBytes,
- containerInfo.getProtobuf().toByteArray());
+ containerStore
+ .put(new ContainerID(containerInfo.getContainerID()), containerInfo);
// Incrementing here, as allocateBlock to create a container calls
// getMatchingContainer() and finally calls this API to add newly
// created container to DB.
@@ -586,9 +579,6 @@ public class SCMContainerManager implements
ContainerManager {
if (containerStateManager != null) {
containerStateManager.close();
}
- if (containerStore != null) {
- containerStore.close();
- }
if (scmContainerManagerMetrics != null) {
this.scmContainerManagerMetrics.unRegister();
@@ -612,11 +602,6 @@ public class SCMContainerManager implements
ContainerManager {
}
}
- protected File getContainerDBPath(ConfigurationSource conf) {
- File metaDir = ServerUtils.getScmDbDir(conf);
- return new File(metaDir, SCM_CONTAINER_DB);
- }
-
protected PipelineManager getPipelineManager() {
return pipelineManager;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
new file mode 100644
index 0000000..87c9e91
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerIDCodec.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.utils.db.Codec;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+
+/**
+ * Codec to serialize / deserialize ContainerID.
+ */
+public class ContainerIDCodec implements Codec<ContainerID> {
+
+ private Codec<Long> longCodec = new LongCodec();
+
+ @Override
+ public byte[] toPersistedFormat(ContainerID container) throws IOException {
+ return longCodec.toPersistedFormat(container.getId());
+ }
+
+ @Override
+ public ContainerID fromPersistedFormat(byte[] rawData) throws IOException {
+ return new ContainerID(longCodec.fromPersistedFormat(rawData));
+ }
+
+ @Override
+ public ContainerID copyObject(ContainerID object) {
+ return new ContainerID(object.getId());
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
new file mode 100644
index 0000000..6b26215
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/ContainerInfoCodec.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ContainerInfoProto;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+/**
+ * Codec to serialize / deserialize ContainerInfo.
+ */
+public class ContainerInfoCodec implements Codec<ContainerInfo> {
+
+ @Override
+ public byte[] toPersistedFormat(ContainerInfo container) throws IOException {
+ return container.getProtobuf().toByteArray();
+ }
+
+ @Override
+ public ContainerInfo fromPersistedFormat(byte[] rawData) throws IOException {
+ return ContainerInfo.fromProtobuf(
+ ContainerInfoProto.PARSER.parseFrom(rawData));
+ }
+
+ @Override
+ public ContainerInfo copyObject(ContainerInfo object) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
new file mode 100644
index 0000000..25a1e44
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineCodec.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+import java.time.Instant;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Codec to serialize / deserialize Pipeline.
+ */
+public class PipelineCodec implements Codec<Pipeline> {
+
+ @Override
+ public byte[] toPersistedFormat(Pipeline object) throws IOException {
+ return object.getProtobufMessage().toByteArray();
+ }
+
+ @Override
+ public Pipeline fromPersistedFormat(byte[] rawData) throws IOException {
+ HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
+ .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(rawData));
+ Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
+ HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
+ // When SCM is restarted, set Creation time with current time.
+ pipeline.setCreationTimestamp(Instant.now());
+ Preconditions.checkNotNull(pipeline);
+ return pipeline;
+ }
+
+ @Override
+ public Pipeline copyObject(Pipeline object) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
new file mode 100644
index 0000000..d661e34
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/PipelineIDCodec.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+/**
+ * Codec to serialize / deserialize PipelineID.
+ */
+public class PipelineIDCodec implements Codec<PipelineID> {
+
+ @Override
+ public byte[] toPersistedFormat(PipelineID object) throws IOException {
+ return object.getProtobuf().toByteArray();
+ }
+
+ @Override
+ public PipelineID fromPersistedFormat(byte[] rawData) throws IOException {
+ return null;
+ }
+
+ @Override
+ public PipelineID copyObject(PipelineID object) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
new file mode 100644
index 0000000..fcddcdd
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import java.math.BigInteger;
+import java.security.cert.X509Certificate;
+
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import org.apache.hadoop.hdds.utils.db.DBColumnFamilyDefinition;
+import org.apache.hadoop.hdds.utils.db.DBDefinition;
+import org.apache.hadoop.hdds.utils.db.LongCodec;
+
+/**
+ * Class defines the structure and types of the scm.db.
+ */
+public class SCMDBDefinition implements DBDefinition {
+
+ public static final DBColumnFamilyDefinition<Long, DeletedBlocksTransaction>
+ DELETED_BLOCKS =
+ new DBColumnFamilyDefinition<>(
+ "deletedBlocks",
+ Long.class,
+ new LongCodec(),
+ DeletedBlocksTransaction.class,
+ new DeletedBlocksTransactionCodec());
+
+ public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
+ VALID_CERTS =
+ new DBColumnFamilyDefinition<>(
+ "validCerts",
+ BigInteger.class,
+ new BigIntegerCodec(),
+ X509Certificate.class,
+ new X509CertificateCodec());
+
+ public static final DBColumnFamilyDefinition<BigInteger, X509Certificate>
+ REVOKED_CERTS =
+ new DBColumnFamilyDefinition<>(
+ "revokedCerts",
+ BigInteger.class,
+ new BigIntegerCodec(),
+ X509Certificate.class,
+ new X509CertificateCodec());
+
+ public static final DBColumnFamilyDefinition<PipelineID, Pipeline>
+ PIPELINES =
+ new DBColumnFamilyDefinition<>(
+ "pipelines",
+ PipelineID.class,
+ new PipelineIDCodec(),
+ Pipeline.class,
+ new PipelineCodec());
+
+ public static final DBColumnFamilyDefinition<ContainerID, ContainerInfo>
+ CONTAINERS =
+ new DBColumnFamilyDefinition<ContainerID, ContainerInfo>(
+ "containers",
+ ContainerID.class,
+ new ContainerIDCodec(),
+ ContainerInfo.class,
+ new ContainerInfoCodec());
+
+ @Override
+ public String getName() {
+ return "scm.db";
+ }
+
+ @Override
+ public String getLocationConfigKey() {
+ return ScmConfigKeys.OZONE_SCM_DB_DIRS;
+ }
+
+ @Override
+ public DBColumnFamilyDefinition[] getColumnFamilies() {
+ return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
+ REVOKED_CERTS, PIPELINES, CONTAINERS};
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 1150316..0452c05 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -17,18 +17,24 @@
*/
package org.apache.hadoop.hdds.scm.metadata;
+import java.io.IOException;
import java.math.BigInteger;
import java.security.cert.X509Certificate;
+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import java.io.IOException;
-import com.google.common.annotations.VisibleForTesting;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Generic interface for data stores for SCM.
* This is similar to the OMMetadataStore class,
@@ -99,5 +105,18 @@ public interface SCMMetadataStore {
*/
TableIterator getAllCerts(CertificateStore.CertType certType);
+ /**
+ * A Table that maintains all the pipeline information.
+ */
+ Table<PipelineID, Pipeline> getPipelineTable();
+
+ /**
+ * Helper to create and write batch transactions.
+ */
+ BatchOperationHandler getBatchHandler();
+ /**
+ * Table that maintains all the container information.
+ */
+ Table<ContainerID, ContainerInfo> getContainerTable();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
index 72818a3..3823fd8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreRDBImpl.java
@@ -17,53 +17,48 @@
*/
package org.apache.hadoop.hdds.scm.metadata;
-import java.io.File;
+import java.io.IOException;
import java.math.BigInteger;
-import java.nio.file.Paths;
import java.security.cert.X509Certificate;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import java.io.IOException;
-import org.apache.hadoop.hdds.security.x509.certificate.authority
- .CertificateStore;
-import org.apache.hadoop.hdds.server.ServerUtils;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
+import
org.apache.hadoop.hdds.security.x509.certificate.authority.CertificateStore;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
import org.apache.hadoop.hdds.utils.db.TableIterator;
+
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
+import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
+import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.VALID_CERTS;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_DB_NAME;
-
/**
* A RocksDB based implementation of SCM Metadata Store.
- * <p>
- * <p>
- * +---------------+------------------+-------------------------+
- * | Column Family | Key | Value |
- * +---------------+------------------+-------------------------+
- * | DeletedBlocks | TXID(Long) | DeletedBlockTransaction |
- * +---------------+------------------+-------------------------+
- * | ValidCerts | Serial (BigInt) | X509Certificate |
- * +---------------+------------------+-------------------------+
- * |RevokedCerts | Serial (BigInt) | X509Certificate |
- * +---------------+------------------+-------------------------+
+ *
*/
public class SCMMetadataStoreRDBImpl implements SCMMetadataStore {
- private static final String DELETED_BLOCKS_TABLE = "deletedBlocks";
- private Table deletedBlocksTable;
+ private Table<Long, DeletedBlocksTransaction> deletedBlocksTable;
- private static final String VALID_CERTS_TABLE = "validCerts";
- private Table validCertsTable;
+ private Table<BigInteger, X509Certificate> validCertsTable;
- private static final String REVOKED_CERTS_TABLE = "revokedCerts";
- private Table revokedCertsTable;
+ private Table<BigInteger, X509Certificate> revokedCertsTable;
+ private Table<ContainerID, ContainerInfo> containerTable;
+ private Table<PipelineID, Pipeline> pipelineTable;
private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreRDBImpl.class);
@@ -88,31 +83,26 @@ public class SCMMetadataStoreRDBImpl implements
SCMMetadataStore {
public void start(OzoneConfiguration config)
throws IOException {
if (this.store == null) {
- File metaDir = ServerUtils.getScmDbDir(configuration);
-
- this.store = DBStoreBuilder.newBuilder(configuration)
- .setName(SCM_DB_NAME)
- .setPath(Paths.get(metaDir.getPath()))
- .addTable(DELETED_BLOCKS_TABLE)
- .addTable(VALID_CERTS_TABLE)
- .addTable(REVOKED_CERTS_TABLE)
- .addCodec(DeletedBlocksTransaction.class,
- new DeletedBlocksTransactionCodec())
- .addCodec(BigInteger.class, new BigIntegerCodec())
- .addCodec(X509Certificate.class, new X509CertificateCodec())
- .build();
-
- deletedBlocksTable = this.store.getTable(DELETED_BLOCKS_TABLE,
- Long.class, DeletedBlocksTransaction.class);
- checkTableStatus(deletedBlocksTable, DELETED_BLOCKS_TABLE);
-
- validCertsTable = this.store.getTable(VALID_CERTS_TABLE,
- BigInteger.class, X509Certificate.class);
- checkTableStatus(validCertsTable, VALID_CERTS_TABLE);
-
- revokedCertsTable = this.store.getTable(REVOKED_CERTS_TABLE,
- BigInteger.class, X509Certificate.class);
- checkTableStatus(revokedCertsTable, REVOKED_CERTS_TABLE);
+
+ this.store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
+
+ deletedBlocksTable =
+ DELETED_BLOCKS.getTable(this.store);
+
+ checkTableStatus(deletedBlocksTable,
+ DELETED_BLOCKS.getName());
+
+ validCertsTable = VALID_CERTS.getTable(store);
+
+ checkTableStatus(validCertsTable, VALID_CERTS.getName());
+
+ revokedCertsTable = REVOKED_CERTS.getTable(store);
+
+ checkTableStatus(revokedCertsTable, REVOKED_CERTS.getName());
+
+ pipelineTable = PIPELINES.getTable(store);
+
+ containerTable = CONTAINERS.getTable(store);
}
}
@@ -163,6 +153,21 @@ public class SCMMetadataStoreRDBImpl implements
SCMMetadataStore {
}
@Override
+ public Table<PipelineID, Pipeline> getPipelineTable() {
+ return pipelineTable;
+ }
+
+ @Override
+ public BatchOperationHandler getBatchHandler() {
+ return this.store;
+ }
+
+ @Override
+ public Table<ContainerID, ContainerInfo> getContainerTable() {
+ return containerTable;
+ }
+
+ @Override
public Long getCurrentTXID() {
return this.txID.get();
}
@@ -174,8 +179,8 @@ public class SCMMetadataStoreRDBImpl implements
SCMMetadataStore {
* @throws IOException
*/
private Long getLargestRecordedTXID() throws IOException {
- try (TableIterator<Long, DeletedBlocksTransaction> txIter =
- deletedBlocksTable.iterator()) {
+ try (TableIterator<Long, ? extends KeyValue<Long,
DeletedBlocksTransaction>>
+ txIter = deletedBlocksTable.iterator()) {
txIter.seekToLast();
Long txid = txIter.key();
if (txid != null) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 200b358..38edbac 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -19,7 +19,6 @@
package org.apache.hadoop.hdds.scm.pipeline;
import javax.management.ObjectName;
-import java.io.File;
import java.io.IOException;
import java.time.Duration;
import java.time.Instant;
@@ -38,7 +37,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -46,19 +44,15 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.MetadataKeyFilters;
-import org.apache.hadoop.hdds.utils.MetadataStore;
-import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
import org.apache.hadoop.hdds.utils.Scheduler;
+import org.apache.hadoop.hdds.utils.db.Table;
+import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -77,7 +71,6 @@ public class SCMPipelineManager implements PipelineManager {
private PipelineStateManager stateManager;
private final BackgroundPipelineCreator backgroundPipelineCreator;
private Scheduler scheduler;
- private MetadataStore pipelineStore;
private final EventPublisher eventPublisher;
private final NodeManager nodeManager;
@@ -87,28 +80,35 @@ public class SCMPipelineManager implements PipelineManager {
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
+ private Table<PipelineID, Pipeline> pipelineStore;
+
private final AtomicBoolean isInSafeMode;
// Used to track if the safemode pre-checks have completed. This is designed
// to prevent pipelines being created until sufficient nodes have registered.
private final AtomicBoolean pipelineCreationAllowed;
- public SCMPipelineManager(ConfigurationSource conf, NodeManager nodeManager,
+ public SCMPipelineManager(ConfigurationSource conf,
+ NodeManager nodeManager,
+ Table<PipelineID, Pipeline> pipelineStore,
EventPublisher eventPublisher)
throws IOException {
- this(conf, nodeManager, eventPublisher, null, null);
+ this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
this.stateManager = new PipelineStateManager();
this.pipelineFactory = new PipelineFactory(nodeManager,
stateManager, conf, eventPublisher);
+ this.pipelineStore = pipelineStore;
initializePipelineState();
}
protected SCMPipelineManager(ConfigurationSource conf,
NodeManager nodeManager,
- EventPublisher eventPublisher,
- PipelineStateManager pipelineStateManager,
- PipelineFactory pipelineFactory)
+ Table<PipelineID, Pipeline> pipelineStore,
+ EventPublisher eventPublisher,
+ PipelineStateManager pipelineStateManager,
+ PipelineFactory pipelineFactory)
throws IOException {
this.lock = new ReentrantReadWriteLock();
+ this.pipelineStore = pipelineStore;
this.conf = conf;
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
@@ -116,16 +116,6 @@ public class SCMPipelineManager implements PipelineManager
{
scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
this.backgroundPipelineCreator =
new BackgroundPipelineCreator(this, scheduler, conf);
- int cacheSize = conf.getInt(ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB,
- ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
- final File pipelineDBPath = getPipelineDBPath(conf);
- this.pipelineStore =
- MetadataStoreBuilder.newBuilder()
- .setCreateIfMissing(true)
- .setConf(conf)
- .setDbFile(pipelineDBPath)
- .setCacheSize(cacheSize * OzoneConsts.MB)
- .build();
this.eventPublisher = eventPublisher;
this.nodeManager = nodeManager;
this.metrics = SCMPipelineMetrics.create();
@@ -168,18 +158,10 @@ public class SCMPipelineManager implements
PipelineManager {
LOG.info("No pipeline exists in current db");
return;
}
- List<Map.Entry<byte[], byte[]>> pipelines =
- pipelineStore.getSequentialRangeKVs(null, Integer.MAX_VALUE,
- (MetadataKeyFilters.MetadataKeyFilter[])null);
-
- for (Map.Entry<byte[], byte[]> entry : pipelines) {
- HddsProtos.Pipeline.Builder pipelineBuilder = HddsProtos.Pipeline
- .newBuilder(HddsProtos.Pipeline.PARSER.parseFrom(entry.getValue()));
- Pipeline pipeline = Pipeline.getFromProtobuf(pipelineBuilder.setState(
- HddsProtos.PipelineState.PIPELINE_ALLOCATED).build());
- // When SCM is restarted, set Creation time with current time.
- pipeline.setCreationTimestamp(Instant.now());
- Preconditions.checkNotNull(pipeline);
+ TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
+ iterator = pipelineStore.iterator();
+ while (iterator.hasNext()) {
+ Pipeline pipeline = iterator.next().getValue();
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
}
@@ -231,8 +213,7 @@ public class SCMPipelineManager implements PipelineManager {
lock.writeLock().lock();
try {
Pipeline pipeline = pipelineFactory.create(type, factor);
- pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
- pipeline.getProtobufMessage().toByteArray());
+ pipelineStore.put(pipeline.getId(), pipeline);
stateManager.addPipeline(pipeline);
nodeManager.addPipeline(pipeline);
recordMetricsForPipeline(pipeline);
@@ -588,11 +569,10 @@ public class SCMPipelineManager implements
PipelineManager {
* @throws IOException
*/
protected void removePipeline(PipelineID pipelineId) throws IOException {
- byte[] key = pipelineId.getProtobuf().toByteArray();
lock.writeLock().lock();
try {
if (pipelineStore != null) {
- pipelineStore.delete(key);
+ pipelineStore.delete(pipelineId);
Pipeline pipeline = stateManager.removePipeline(pipelineId);
nodeManager.removePipeline(pipeline);
metrics.incNumPipelineDestroyed();
@@ -617,16 +597,6 @@ public class SCMPipelineManager implements PipelineManager
{
scheduler = null;
}
- lock.writeLock().lock();
- try {
- if (pipelineStore != null) {
- pipelineStore.close();
- pipelineStore = null;
- }
- } finally {
- lock.writeLock().unlock();
- }
-
if(pmInfoBean != null) {
MBeans.unregister(this.pmInfoBean);
pmInfoBean = null;
@@ -638,11 +608,6 @@ public class SCMPipelineManager implements PipelineManager
{
pipelineFactory.shutdown();
}
- protected File getPipelineDBPath(ConfigurationSource configuration) {
- File metaDir = ServerUtils.getScmDbDir(configuration);
- return new File(metaDir, SCM_PIPELINE_DB);
- }
-
protected ReadWriteLock getLock() {
return lock;
}
@@ -652,10 +617,6 @@ public class SCMPipelineManager implements PipelineManager
{
return pipelineFactory;
}
- protected MetadataStore getPipelineStore() {
- return pipelineStore;
- }
-
protected NodeManager getNodeManager() {
return nodeManager;
}
@@ -665,6 +626,10 @@ public class SCMPipelineManager implements PipelineManager
{
return this.isInSafeMode.get();
}
+ public Table<PipelineID, Pipeline> getPipelineStore() {
+ return pipelineStore;
+ }
+
@Override
public synchronized void handleSafeModeTransition(
SCMSafeModeManager.SafeModeStatus status) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 1f2305f..8498a25 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -406,13 +406,19 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
- new SCMPipelineManager(conf, scmNodeManager, eventQueue);
+ new SCMPipelineManager(conf, scmNodeManager,
+ scmMetadataStore.getPipelineTable(),
+ eventQueue);
}
if (configurator.getContainerManager() != null) {
containerManager = configurator.getContainerManager();
} else {
- containerManager = new SCMContainerManager(conf, pipelineManager);
+ containerManager =
+ new SCMContainerManager(conf,
+ scmMetadataStore.getContainerTable(),
+ scmMetadataStore.getBatchHandler(),
+ pipelineManager);
}
if (configurator.getScmBlockManager() != null) {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index 6a19ff9..c9b5fde 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -29,8 +29,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -40,9 +39,11 @@ import
org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
+import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreRDBImpl;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
@@ -55,6 +56,9 @@ import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
import org.apache.hadoop.test.GenericTestUtils;
+
+import static org.apache.hadoop.ozone.OzoneConsts.GB;
+import static org.apache.hadoop.ozone.OzoneConsts.MB;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@@ -63,10 +67,6 @@ import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.rules.TemporaryFolder;
-import static org.apache.hadoop.ozone.OzoneConsts.GB;
-import static org.apache.hadoop.ozone.OzoneConsts.MB;
-
-
/**
* Tests for SCM Block Manager.
*/
@@ -88,6 +88,7 @@ public class TestBlockManager {
@Rule
public TemporaryFolder folder= new TemporaryFolder();
+ private SCMMetadataStore scmMetadataStore;
@Before
public void setUp() throws Exception {
@@ -105,16 +106,25 @@ public class TestBlockManager {
// Override the default Node Manager in SCM with this Mock Node Manager.
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
+
+ scmMetadataStore = new SCMMetadataStoreRDBImpl(conf);
+ scmMetadataStore.start(conf);
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ new SCMPipelineManager(conf, nodeManager,
+ scmMetadataStore.getPipelineTable(),
+ eventQueue);
pipelineManager.allowPipelineCreation();
+
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
SCMContainerManager containerManager =
- new SCMContainerManager(conf, pipelineManager);
+ new SCMContainerManager(conf,
+ scmMetadataStore.getContainerTable(),
+ scmMetadataStore.getStore(),
+ pipelineManager);
SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
containerManager.getContainers(), pipelineManager, eventQueue) {
@Override
@@ -127,6 +137,7 @@ public class TestBlockManager {
configurator.setPipelineManager(pipelineManager);
configurator.setContainerManager(containerManager);
configurator.setScmSafeModeManager(safeModeManager);
+ configurator.setMetadataStore(scmMetadataStore);
scm = TestUtils.getScm(conf, configurator);
// Initialize these fields so that the tests can pass.
@@ -145,10 +156,11 @@ public class TestBlockManager {
}
@After
- public void cleanup() {
+ public void cleanup() throws Exception {
scm.stop();
scm.join();
eventQueue.close();
+ scmMetadataStore.stop();
}
@Test
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index f567500..09b41a5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -28,10 +28,13 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -58,6 +61,7 @@ public class TestCloseContainerEventHandler {
private static long size;
private static File testDir;
private static EventQueue eventQueue;
+ private static DBStore dbStore;
@BeforeClass
public static void setUp() throws Exception {
@@ -71,15 +75,19 @@ public class TestCloseContainerEventHandler {
configuration.setInt(ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT, 16);
nodeManager = new MockNodeManager(true, 10);
eventQueue = new EventQueue();
+ dbStore =
+ DBStoreBuilder.createDBStore(configuration, new SCMDBDefinition());
pipelineManager =
- new SCMPipelineManager(configuration, nodeManager, eventQueue);
+ new SCMPipelineManager(configuration, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(dbStore), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration, eventQueue);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
- containerManager = new SCMContainerManager(configuration, pipelineManager);
+ containerManager = new SCMContainerManager(configuration,
+ SCMDBDefinition.CONTAINERS.getTable(dbStore), dbStore,
pipelineManager);
pipelineManager.triggerPipelineCreation();
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(pipelineManager, containerManager));
@@ -97,6 +105,9 @@ public class TestCloseContainerEventHandler {
if (pipelineManager != null) {
pipelineManager.close();
}
+ if (dbStore != null) {
+ dbStore.close();
+ }
FileUtil.fullyDelete(testDir);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 75d2712..1821e92 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -44,9 +44,12 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.XceiverClientManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -59,6 +62,7 @@ import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
+
/**
* Tests for Container ContainerManager.
*/
@@ -76,6 +80,7 @@ public class TestSCMContainerManager {
@Rule
public ExpectedException thrown = ExpectedException.none();
+
@BeforeClass
public static void setUp() throws Exception {
OzoneConfiguration conf = SCMTestUtils.getConf();
@@ -93,10 +98,15 @@ public class TestSCMContainerManager {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 10);
+ DBStore dbStore = DBStoreBuilder.createDBStore(conf, new
SCMDBDefinition());
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(dbStore), new EventQueue());
pipelineManager.allowPipelineCreation();
- containerManager = new SCMContainerManager(conf, pipelineManager);
+ containerManager = new SCMContainerManager(conf,
+ SCMDBDefinition.CONTAINERS.getTable(dbStore),
+ dbStore,
+ pipelineManager);
xceiverClientManager = new XceiverClientManager(conf);
replicationFactor = SCMTestUtils.getReplicationFactor(conf);
replicationType = SCMTestUtils.getReplicationType(conf);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index de027ed..6ce66a2 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -36,19 +36,24 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementCapacity;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
import org.apache.hadoop.test.PathUtils;
import org.apache.commons.io.IOUtils;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState.HEALTHY;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
+import org.junit.After;
import static org.junit.Assert.assertEquals;
+import org.junit.Before;
import org.junit.Ignore;
import org.junit.Rule;
import org.junit.Test;
@@ -59,9 +64,21 @@ import org.mockito.Mockito;
* Test for different container placement policy.
*/
public class TestContainerPlacement {
+
@Rule
public ExpectedException thrown = ExpectedException.none();
+ private DBStore dbStore;
+
+ @Before
+ public void createDbStore() throws IOException {
+ dbStore =
+ DBStoreBuilder.createDBStore(getConf(), new SCMDBDefinition());
+ }
+ @After
+ public void destroyDBStore() throws Exception {
+ dbStore.close();
+ }
/**
* Returns a new copy of Configuration.
*
@@ -100,11 +117,13 @@ public class TestContainerPlacement {
SCMContainerManager createContainerManager(ConfigurationSource config,
NodeManager scmNodeManager) throws IOException {
EventQueue eventQueue = new EventQueue();
- final int cacheSize = config.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
+
PipelineManager pipelineManager =
- new SCMPipelineManager(config, scmNodeManager, eventQueue);
- return new SCMContainerManager(config, pipelineManager);
+ new SCMPipelineManager(config, scmNodeManager,
+ PIPELINES.getTable(dbStore), eventQueue);
+ return new SCMContainerManager(config, CONTAINERS.getTable(dbStore),
+ dbStore,
+ pipelineManager);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index fcb1c94..007f071 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -37,9 +37,12 @@ import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.test.GenericTestUtils;
@@ -55,6 +58,7 @@ import static org.junit.Assert.fail;
import org.junit.Before;
import org.junit.Test;
+
/**
* Test cases to verify PipelineManager.
*/
@@ -62,6 +66,7 @@ public class TestSCMPipelineManager {
private static MockNodeManager nodeManager;
private static File testDir;
private static OzoneConfiguration conf;
+ private DBStore store;
@Before
public void setUp() throws Exception {
@@ -76,17 +81,24 @@ public class TestSCMPipelineManager {
throw new IOException("Unable to create test directory path");
}
nodeManager = new MockNodeManager(true, 20);
+
+ store = DBStoreBuilder.createDBStore(conf, new SCMDBDefinition());
+
}
@After
- public void cleanup() {
+ public void cleanup() throws Exception {
+ store.close();
FileUtil.fullyDelete(testDir);
}
@Test
public void testPipelineReload() throws IOException {
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf,
+ nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store),
+ new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -106,7 +118,8 @@ public class TestSCMPipelineManager {
// new pipeline manager should be able to load the pipelines from the db
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
pipelineManager.allowPipelineCreation();
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -137,7 +150,8 @@ public class TestSCMPipelineManager {
@Test
public void testRemovePipeline() throws IOException {
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -156,7 +170,8 @@ public class TestSCMPipelineManager {
// new pipeline manager should not be able to load removed pipelines
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
try {
pipelineManager.getPipeline(pipeline.getId());
fail("Pipeline should not have been retrieved");
@@ -172,7 +187,8 @@ public class TestSCMPipelineManager {
public void testPipelineReport() throws IOException {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -238,7 +254,8 @@ public class TestSCMPipelineManager {
MockNodeManager nodeManagerMock = new MockNodeManager(true,
20);
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManagerMock, new EventQueue());
+ new SCMPipelineManager(conf, nodeManagerMock,
+ SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManagerMock,
@@ -297,7 +314,8 @@ public class TestSCMPipelineManager {
@Test
public void testActivateDeactivatePipeline() throws IOException {
final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, new EventQueue());
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), new EventQueue());
pipelineManager.allowPipelineCreation();
final PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -345,7 +363,8 @@ public class TestSCMPipelineManager {
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
EventQueue eventQueue = new EventQueue();
SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -361,7 +380,8 @@ public class TestSCMPipelineManager {
pipelineManager.close();
// new pipeline manager loads the pipelines from the db in ALLOCATED state
pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
@@ -406,7 +426,8 @@ public class TestSCMPipelineManager {
EventQueue eventQueue = new EventQueue();
final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
pipelineManager.allowPipelineCreation();
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
@@ -448,8 +469,9 @@ public class TestSCMPipelineManager {
TimeUnit.MILLISECONDS);
EventQueue eventQueue = new EventQueue();
- final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ SCMPipelineManager pipelineManager =
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
@@ -484,7 +506,6 @@ public class TestSCMPipelineManager {
pipelineManager.close();
}
-
@Test
public void testSafeModeUpdatedOnSafemodeExit()
throws IOException, TimeoutException, InterruptedException {
@@ -494,8 +515,9 @@ public class TestSCMPipelineManager {
TimeUnit.MILLISECONDS);
EventQueue eventQueue = new EventQueue();
- final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager, eventQueue);
+ SCMPipelineManager pipelineManager =
+ new SCMPipelineManager(conf, nodeManager,
+ SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
false);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index bda6f84..700479d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -18,6 +18,11 @@
package org.apache.hadoop.hdds.scm.safemode;
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -26,21 +31,20 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.test.GenericTestUtils;
+
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.LoggerFactory;
-import java.io.File;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.UUID;
-
/**
* This class tests HealthyPipelineSafeMode rule.
*/
@@ -49,7 +53,7 @@ public class TestHealthyPipelineSafeModeRule {
@Test
public void testHealthyPipelineSafeModeRuleWithNoPipelines()
throws Exception {
-
+ DBStore store = null;
String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
try {
@@ -65,9 +69,9 @@ public class TestHealthyPipelineSafeModeRule {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
-
+ store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue);
+ nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
@@ -82,16 +86,16 @@ public class TestHealthyPipelineSafeModeRule {
// This should be immediately satisfied, as no pipelines are there yet.
Assert.assertTrue(healthyPipelineSafeModeRule.validate());
} finally {
+ store.close();
FileUtil.fullyDelete(new File(storageDir));
}
}
@Test
public void testHealthyPipelineSafeModeRuleWithPipelines() throws Exception {
-
String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
-
+ DBStore store = null;
try {
EventQueue eventQueue = new EventQueue();
List<ContainerInfo> containers =
@@ -109,8 +113,9 @@ public class TestHealthyPipelineSafeModeRule {
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue);
+ nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
@@ -153,6 +158,7 @@ public class TestHealthyPipelineSafeModeRule {
GenericTestUtils.waitFor(() -> healthyPipelineSafeModeRule.validate(),
1000, 5000);
} finally {
+ store.close();
FileUtil.fullyDelete(new File(storageDir));
}
}
@@ -164,6 +170,7 @@ public class TestHealthyPipelineSafeModeRule {
String storageDir = GenericTestUtils.getTempPath(
TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
+ DBStore store = null;
try {
EventQueue eventQueue = new EventQueue();
@@ -183,8 +190,10 @@ public class TestHealthyPipelineSafeModeRule {
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
+ store = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, eventQueue);
+ nodeManager, SCMDBDefinition.PIPELINES.getTable(store), eventQueue);
+
pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -235,6 +244,7 @@ public class TestHealthyPipelineSafeModeRule {
1000, 5000);
} finally {
+ store.close();
FileUtil.fullyDelete(new File(storageDir));
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index dd8d301..c1f09fa 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -17,28 +17,32 @@
package org.apache.hadoop.hdds.scm.safemode;
+import java.util.ArrayList;
+import java.util.List;
+
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.test.GenericTestUtils;
+
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* This class tests OneReplicaPipelineSafeModeRule.
*/
@@ -50,7 +54,6 @@ public class TestOneReplicaPipelineSafeModeRule {
private SCMPipelineManager pipelineManager;
private EventQueue eventQueue;
-
private void setup(int nodes, int pipelineFactorThreeCount,
int pipelineFactorOneCount) throws Exception {
OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
@@ -66,8 +69,13 @@ public class TestOneReplicaPipelineSafeModeRule {
MockNodeManager mockNodeManager = new MockNodeManager(true, nodes);
eventQueue = new EventQueue();
+
+ DBStore dbStore =
+ DBStoreBuilder.createDBStore(ozoneConfiguration, new
SCMDBDefinition());
+
pipelineManager =
new SCMPipelineManager(ozoneConfiguration, mockNodeManager,
+ SCMDBDefinition.PIPELINES.getTable(dbStore),
eventQueue);
pipelineManager.allowPipelineCreation();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 0620883..9d22304 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.safemode;
import java.io.File;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -33,6 +34,7 @@ import org.apache.hadoop.hdds.scm.HddsTestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -41,8 +43,11 @@ import
org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -69,6 +74,8 @@ public class TestSCMSafeModeManager {
@Rule
public final TemporaryFolder tempDir = new TemporaryFolder();
+ private DBStore dbStore;
+
@Before
public void setUp() {
queue = new EventQueue();
@@ -77,6 +84,20 @@ public class TestSCMSafeModeManager {
false);
}
+ @Before
+ public void initDbStore() throws IOException {
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ tempDir.newFolder().getAbsolutePath());
+ dbStore = DBStoreBuilder.createDBStore(config, new SCMDBDefinition());
+ }
+
+ @After
+ public void destroyDbStore() throws Exception {
+ if (dbStore != null) {
+ dbStore.close();
+ }
+ }
+
@Test
public void testSafeModeState() throws Exception {
// Test 1: test for 0 containers
@@ -184,12 +205,32 @@ public class TestSCMSafeModeManager {
}
@Test
- public void testSafeModeExitRuleWithPipelineAvailabilityCheck()
- throws Exception{
+ public void testSafeModeExitRuleWithPipelineAvailabilityCheck1()
+ throws Exception {
testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0.90, 1);
+ }
+
+ @Test
+ public void testSafeModeExitRuleWithPipelineAvailabilityCheck2()
+ throws Exception {
testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0.10, 0.9);
+ }
+
+ @Test
+ public void testSafeModeExitRuleWithPipelineAvailabilityCheck3()
+ throws Exception {
testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 30, 8, 0, 0.9);
+ }
+
+ @Test
+ public void testSafeModeExitRuleWithPipelineAvailabilityCheck4()
+ throws Exception {
testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0);
+ }
+
+ @Test
+ public void testSafeModeExitRuleWithPipelineAvailabilityCheck5()
+ throws Exception {
testSafeModeExitRuleWithPipelineAvailabilityCheck(100, 90, 22, 0, 0.5);
}
@@ -201,7 +242,7 @@ public class TestSCMSafeModeManager {
0.9);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue);
+ mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForHealthyPipelinePercent");
@@ -219,7 +260,7 @@ public class TestSCMSafeModeManager {
200);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue);
+ mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
@@ -236,7 +277,7 @@ public class TestSCMSafeModeManager {
conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue);
+ mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
scmSafeModeManager = new SCMSafeModeManager(
conf, containers, pipelineManager, queue);
fail("testFailWithIncorrectValueForSafeModePercent");
@@ -260,7 +301,7 @@ public class TestSCMSafeModeManager {
MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
- mockNodeManager, queue);
+ mockNodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config, true);
@@ -477,7 +518,7 @@ public class TestSCMSafeModeManager {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, queue);
+ nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -531,7 +572,8 @@ public class TestSCMSafeModeManager {
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
SCMPipelineManager pipelineManager = new SCMPipelineManager(config,
- nodeManager, queue);
+ nodeManager, SCMDBDefinition.PIPELINES.getTable(dbStore), queue);
+
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
diff --git a/hadoop-ozone/dist/pom.xml b/hadoop-ozone/dist/pom.xml
index 8be5c5b..e892053 100644
--- a/hadoop-ozone/dist/pom.xml
+++ b/hadoop-ozone/dist/pom.xml
@@ -23,7 +23,7 @@
</parent>
<artifactId>hadoop-ozone-dist</artifactId>
<name>Apache Hadoop Ozone Distribution</name>
- <packaging>pom</packaging>
+ <packaging>jar</packaging>
<version>0.6.0-SNAPSHOT</version>
<properties>
<file.encoding>UTF-8</file.encoding>
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
index 20afa09..aa524b0 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconContainerManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.recon.scm;
-import java.io.File;
import java.io.IOException;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
@@ -30,11 +29,11 @@ import
org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.SCMContainerManager;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.hdds.utils.db.BatchOperationHandler;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
-import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_SCM_CONTAINER_DB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,33 +55,29 @@ public class ReconContainerManager extends
SCMContainerManager {
* CacheSize is specified
* in MB.
*
- * @param conf - {@link ConfigurationSource}
- * @param pipelineManager - {@link PipelineManager}
* @throws IOException on Failure.
*/
public ReconContainerManager(
- ConfigurationSource conf, PipelineManager pipelineManager,
+ ConfigurationSource conf,
+ Table<ContainerID, ContainerInfo> containerStore,
+ BatchOperationHandler batchHandler,
+ PipelineManager pipelineManager,
StorageContainerServiceProvider scm,
ContainerSchemaManager containerSchemaManager) throws IOException {
- super(conf, pipelineManager);
+ super(conf, containerStore, batchHandler, pipelineManager);
this.scmClient = scm;
this.containerSchemaManager = containerSchemaManager;
}
- @Override
- protected File getContainerDBPath(ConfigurationSource conf) {
- File metaDir = ReconUtils.getReconScmDbDir(conf);
- return new File(metaDir, RECON_SCM_CONTAINER_DB);
- }
-
/**
* Check and add new container if not already present in Recon.
- * @param containerID containerID to check.
+ *
+ * @param containerID containerID to check.
* @param datanodeDetails Datanode from where we got this container.
* @throws IOException on Error.
*/
public void checkAndAddNewContainer(ContainerID containerID,
- DatanodeDetails datanodeDetails)
+ DatanodeDetails datanodeDetails)
throws IOException {
if (!exists(containerID)) {
LOG.info("New container {} got from {}.", containerID,
@@ -143,7 +138,7 @@ public class ReconContainerManager extends
SCMContainerManager {
*/
@Override
public void updateContainerReplica(ContainerID containerID,
- ContainerReplica replica)
+ ContainerReplica replica)
throws ContainerNotFoundException {
super.updateContainerReplica(containerID, replica);
// Update container_history table
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
new file mode 100644
index 0000000..bcfe060
--- /dev/null
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconDBDefinition.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.ozone.recon.scm;
+
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.ozone.recon.ReconServerConfigKeys;
+
+/**
+ * SCM db file for ozone.
+ */
+public class ReconDBDefinition extends SCMDBDefinition {
+
+ @Override
+ public String getName() {
+ return "recon-scm.db";
+ }
+
+ @Override
+ public String getLocationConfigKey() {
+ return ReconServerConfigKeys.OZONE_RECON_SCM_DB_DIR;
+ }
+}
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 20f77c7..a8dd3c9 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.recon.scm;
-import java.io.File;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
@@ -31,14 +30,15 @@ import
org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManager;
import org.apache.hadoop.hdds.scm.pipeline.SCMPipelineManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.ozone.recon.ReconUtils;
+import org.apache.hadoop.hdds.utils.db.Table;
import com.google.common.annotations.VisibleForTesting;
import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
-import static
org.apache.hadoop.ozone.recon.ReconConstants.RECON_SCM_PIPELINE_DB;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
+
/**
* Recon's overriding implementation of SCM's Pipeline Manager.
*/
@@ -48,20 +48,16 @@ public class ReconPipelineManager extends
SCMPipelineManager {
LoggerFactory.getLogger(ReconPipelineManager.class);
public ReconPipelineManager(ConfigurationSource conf,
- NodeManager nodeManager,
- EventPublisher eventPublisher)
+ NodeManager nodeManager,
+ Table<PipelineID, Pipeline> pipelineStore,
+ EventPublisher eventPublisher)
throws IOException {
- super(conf, nodeManager, eventPublisher, new PipelineStateManager(),
+ super(conf, nodeManager, pipelineStore, eventPublisher,
+ new PipelineStateManager(),
new ReconPipelineFactory());
initializePipelineState();
}
-
- @Override
- protected File getPipelineDBPath(ConfigurationSource conf) {
- File metaDir = ReconUtils.getReconScmDbDir(conf);
- return new File(metaDir, RECON_SCM_PIPELINE_DB);
- }
-
+
@Override
public void triggerPipelineCreation() {
// Don't do anything in Recon.
@@ -147,8 +143,7 @@ public class ReconPipelineManager extends
SCMPipelineManager {
void addPipeline(Pipeline pipeline) throws IOException {
getLock().writeLock().lock();
try {
- getPipelineStore().put(pipeline.getId().getProtobuf().toByteArray(),
- pipeline.getProtobufMessage().toByteArray());
+ getPipelineStore().put(pipeline.getId(), pipeline);
getStateManager().addPipeline(pipeline);
getNodeManager().addPipeline(pipeline);
} finally {
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
index 61de428..7800abb 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconStorageContainerManagerFacade.java
@@ -18,9 +18,6 @@
package org.apache.hadoop.ozone.recon.scm;
-import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
-import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
-
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
@@ -28,8 +25,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
-
-import com.google.inject.Inject;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.block.BlockManager;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
@@ -49,13 +44,19 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SafeModeManager;
-import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager;
+import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ozone.recon.fsck.MissingContainerTask;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+
+import com.google.inject.Inject;
+import static
org.apache.hadoop.hdds.recon.ReconConfigKeys.RECON_SCM_CONFIG_PREFIX;
+import static
org.apache.hadoop.hdds.scm.server.StorageContainerManager.buildRpcServerStartMessage;
import org.hadoop.ozone.recon.schema.tables.daos.ReconTaskStatusDao;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -73,6 +74,7 @@ public class ReconStorageContainerManagerFacade
private final ReconDatanodeProtocolServer datanodeProtocolServer;
private final EventQueue eventQueue;
private final SCMStorageConfig scmStorageConfig;
+ private final DBStore dbStore;
private ReconNodeManager nodeManager;
private ReconPipelineManager pipelineManager;
@@ -83,23 +85,34 @@ public class ReconStorageContainerManagerFacade
@Inject
public ReconStorageContainerManagerFacade(OzoneConfiguration conf,
- StorageContainerServiceProvider scmServiceProvider,
- ReconTaskStatusDao reconTaskStatusDao,
- ContainerSchemaManager containerSchemaManager)
+ StorageContainerServiceProvider scmServiceProvider,
+ ReconTaskStatusDao reconTaskStatusDao,
+ ContainerSchemaManager containerSchemaManager)
throws IOException {
this.eventQueue = new EventQueue();
eventQueue.setSilent(true);
this.ozoneConfiguration = getReconScmConfiguration(conf);
this.scmStorageConfig = new ReconStorageConfig(conf);
this.clusterMap = new NetworkTopologyImpl(conf);
+ dbStore = DBStoreBuilder
+ .createDBStore(ozoneConfiguration, new ReconDBDefinition());
+
this.nodeManager =
new ReconNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
this.datanodeProtocolServer = new ReconDatanodeProtocolServer(
conf, this, eventQueue);
this.pipelineManager =
- new ReconPipelineManager(conf, nodeManager, eventQueue);
- this.containerManager = new ReconContainerManager(conf, pipelineManager,
- scmServiceProvider, containerSchemaManager);
+
+ new ReconPipelineManager(conf,
+ nodeManager,
+ ReconDBDefinition.PIPELINES.getTable(dbStore),
+ eventQueue);
+ this.containerManager = new ReconContainerManager(conf,
+ ReconDBDefinition.CONTAINERS.getTable(dbStore),
+ dbStore,
+ pipelineManager,
+ scmServiceProvider,
+ containerSchemaManager);
this.scmServiceProvider = scmServiceProvider;
NodeReportHandler nodeReportHandler =
@@ -214,6 +227,11 @@ public class ReconStorageContainerManagerFacade
IOUtils.cleanupWithLogger(LOG, nodeManager);
IOUtils.cleanupWithLogger(LOG, containerManager);
IOUtils.cleanupWithLogger(LOG, pipelineManager);
+ try {
+ dbStore.close();
+ } catch (Exception e) {
+ LOG.error("Can't close dbStore ", e);
+ }
}
public ReconDatanodeProtocolServer getDatanodeProtocolServer() {
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
index 7f87806..04010e5 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/AbstractReconContainerManagerTest.java
@@ -18,15 +18,6 @@
package org.apache.hadoop.ozone.recon.scm;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
-import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -40,12 +31,23 @@ import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.recon.persistence.ContainerSchemaManager;
import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider;
+
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState.OPEN;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor.ONE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType.STAND_ALONE;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
/**
* Abstract class for Recon Container Manager related tests.
@@ -59,6 +61,7 @@ public class AbstractReconContainerManagerTest {
private SCMStorageConfig scmStorageConfig;
private ReconPipelineManager pipelineManager;
private ReconContainerManager containerManager;
+ private DBStore store;
@Before
public void setUp() throws Exception {
@@ -66,20 +69,28 @@ public class AbstractReconContainerManagerTest {
conf.set(OZONE_METADATA_DIRS,
temporaryFolder.newFolder().getAbsolutePath());
conf.set(OZONE_SCM_NAMES, "localhost");
+ store = DBStoreBuilder.createDBStore(conf, new ReconDBDefinition());
scmStorageConfig = new ReconStorageConfig(conf);
NetworkTopology clusterMap = new NetworkTopologyImpl(conf);
EventQueue eventQueue = new EventQueue();
NodeManager nodeManager =
new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
- pipelineManager = new ReconPipelineManager(conf, nodeManager, eventQueue);
- containerManager = new ReconContainerManager(conf, pipelineManager,
- getScmServiceProvider(), mock(ContainerSchemaManager.class));
+ pipelineManager = new ReconPipelineManager(conf, nodeManager,
+ ReconDBDefinition.PIPELINES.getTable(store), eventQueue);
+ containerManager = new ReconContainerManager(
+ conf,
+ ReconDBDefinition.CONTAINERS.getTable(store),
+ store,
+ pipelineManager,
+ getScmServiceProvider(),
+ mock(ContainerSchemaManager.class));
}
@After
- public void tearDown() throws IOException {
+ public void tearDown() throws Exception {
containerManager.close();
pipelineManager.close();
+ store.close();
}
protected OzoneConfiguration getConf() {
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
index 3d4d239..c891f33 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconPipelineManager.java
@@ -18,15 +18,6 @@
package org.apache.hadoop.ozone.recon.scm;
-import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
-import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
-import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
-import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -45,11 +36,23 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import
org.apache.hadoop.ozone.recon.scm.ReconPipelineFactory.ReconPipelineProvider;
+
+import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_NAMES;
+import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_METADATA_DIRS;
+import static
org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline;
+import org.junit.After;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
+import static org.mockito.Mockito.mock;
/**
* Class to test Recon Pipeline Manager.
@@ -61,6 +64,7 @@ public class TestReconPipelineManager {
private OzoneConfiguration conf;
private SCMStorageConfig scmStorageConfig;
+ private DBStore store;
@Before
public void setup() throws IOException {
@@ -69,6 +73,12 @@ public class TestReconPipelineManager {
temporaryFolder.newFolder().getAbsolutePath());
conf.set(OZONE_SCM_NAMES, "localhost");
scmStorageConfig = new ReconStorageConfig(conf);
+ store = DBStoreBuilder.createDBStore(conf, new ReconDBDefinition());
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ store.close();
}
@Test
@@ -103,7 +113,8 @@ public class TestReconPipelineManager {
new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
try (ReconPipelineManager reconPipelineManager =
- new ReconPipelineManager(conf, nodeManager, eventQueue)) {
+ new ReconPipelineManager(conf, nodeManager,
+ ReconDBDefinition.PIPELINES.getTable(store), eventQueue)) {
reconPipelineManager.addPipeline(validPipeline);
reconPipelineManager.addPipeline(invalidPipeline);
@@ -138,7 +149,8 @@ public class TestReconPipelineManager {
new SCMNodeManager(conf, scmStorageConfig, eventQueue, clusterMap);
ReconPipelineManager reconPipelineManager =
- new ReconPipelineManager(conf, nodeManager, eventQueue);
+ new ReconPipelineManager(conf, nodeManager,
+ ReconDBDefinition.PIPELINES.getTable(store), eventQueue);
assertFalse(reconPipelineManager.containsPipeline(pipeline.getId()));
reconPipelineManager.addPipeline(pipeline);
assertTrue(reconPipelineManager.containsPipeline(pipeline.getId()));
@@ -150,7 +162,8 @@ public class TestReconPipelineManager {
NodeManager nodeManagerMock = mock(NodeManager.class);
ReconPipelineManager reconPipelineManager = new ReconPipelineManager(
- conf, nodeManagerMock, new EventQueue());
+ conf, nodeManagerMock, ReconDBDefinition.PIPELINES.getTable(store),
+ new EventQueue());
PipelineFactory pipelineFactory =
reconPipelineManager.getPipelineFactory();
assertTrue(pipelineFactory instanceof ReconPipelineFactory);
ReconPipelineFactory reconPipelineFactory =
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
index 6e531f3..19b5e8e 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkOzoneManager.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.ozone.genesis;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
@@ -27,7 +25,6 @@ import java.util.List;
import java.util.UUID;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -47,7 +44,9 @@ import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.security.UserGroupInformation;
-import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Scope;
@@ -77,7 +76,7 @@ public class BenchMarkOzoneManager {
@Setup(Level.Trial)
public static void initialize()
- throws IOException, AuthenticationException, InterruptedException {
+ throws Exception {
try {
lock.lock();
if (scm == null) {
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
index 0839ea5..64e2f4d 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/BenchMarkSCM.java
@@ -18,13 +18,10 @@
package org.apache.hadoop.ozone.genesis;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
-
import java.io.File;
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.lang3.RandomStringUtils;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
@@ -38,7 +35,9 @@ import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import
org.apache.hadoop.security.authentication.client.AuthenticationException;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_OWNER_CONTAINER_COUNT;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Param;
@@ -67,7 +66,7 @@ public class BenchMarkSCM {
@Setup(Level.Trial)
public static void initialize()
- throws IOException, AuthenticationException, InterruptedException {
+ throws Exception {
try {
lock.lock();
if (scm == null) {
diff --git
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
index e98cabc..797c805 100644
---
a/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
+++
b/hadoop-ozone/tools/src/main/java/org/apache/hadoop/ozone/genesis/GenesisUtil.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.genesis;
-import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.nio.file.Paths;
@@ -32,15 +31,17 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.hdds.utils.MetadataStore;
import org.apache.hadoop.hdds.utils.MetadataStoreBuilder;
-import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.hdds.utils.db.DBStore;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
+import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.ozone.common.Storage;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.OMStorage;
@@ -48,9 +49,7 @@ import org.apache.hadoop.ozone.om.OzoneManager;
import
org.apache.hadoop.security.authentication.client.AuthenticationException;
import org.apache.commons.lang3.RandomStringUtils;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_DEFAULT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_DB_CACHE_SIZE_MB;
-import static org.apache.hadoop.ozone.OzoneConsts.SCM_PIPELINE_DB;
+
/**
* Utility class for benchmark test cases.
@@ -150,16 +149,11 @@ public final class GenesisUtil {
}
static void addPipelines(HddsProtos.ReplicationFactor factor,
- int numPipelines, ConfigurationSource conf) throws IOException {
- final File metaDir = ServerUtils.getScmDbDir(conf);
- final File pipelineDBPath = new File(metaDir, SCM_PIPELINE_DB);
- int cacheSize = conf.getInt(OZONE_SCM_DB_CACHE_SIZE_MB,
- OZONE_SCM_DB_CACHE_SIZE_DEFAULT);
- MetadataStore pipelineStore =
- MetadataStoreBuilder.newBuilder().setCreateIfMissing(true)
- .setConf(conf).setDbFile(pipelineDBPath)
- .setCacheSize(cacheSize * OzoneConsts.MB).build();
+ int numPipelines, ConfigurationSource conf) throws Exception {
+ DBStore dbStore = DBStoreBuilder.createDBStore(conf, new
SCMDBDefinition());
+ Table<PipelineID, Pipeline> pipelineTable =
+ SCMDBDefinition.PIPELINES.getTable(dbStore);
List<DatanodeDetails> nodes = new ArrayList<>();
for (int i = 0; i < factor.getNumber(); i++) {
nodes
@@ -174,11 +168,11 @@ public final class GenesisUtil {
.setFactor(factor)
.setNodes(nodes)
.build();
- pipelineStore.put(pipeline.getId().getProtobuf().toByteArray(),
- pipeline.getProtobufMessage().toByteArray());
+ pipelineTable.put(pipeline.getId(),
+ pipeline);
}
- pipelineStore.close();
+ dbStore.close();
}
static OzoneManager getOm(OzoneConfiguration conf)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]