wangchao316 commented on a change in pull request #5295:
URL: https://github.com/apache/iotdb/pull/5295#discussion_r830784838



##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTree;
+import 
org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MetaDataTransfer {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MetaDataTransfer.class);
+
+  private static int DEFAULT_TRANSFER_THREAD_POOL_SIZE = 200;
+  private static int DEFAULT_TRANSFER_PLANS_BUFFER_SIZE = 100_000;
+
+  private ForkJoinPool forkJoinPool = new 
ForkJoinPool(DEFAULT_TRANSFER_THREAD_POOL_SIZE);
+
+  private String mtreeSnapshotPath;
+  private MRocksDBManager rocksDBManager;
+  private MLogWriter mLogWriter;
+  private String failedMLogPath =
+      IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
+          + File.separator
+          + MetadataConstant.METADATA_LOG
+          + ".transfer_failed";
+
+  private String idxFilePath =
+      RocksDBReadWriteHandler.ROCKSDB_PATH + File.separator + 
"transfer_mlog.idx";
+
+  private AtomicInteger failedPlanCount = new AtomicInteger(0);
+  private List<PhysicalPlan> retryPlans = new ArrayList<>();
+
+  MetaDataTransfer() throws MetadataException {
+    rocksDBManager = new MRocksDBManager();
+  }
+
+  public static void main(String[] args) {
+    try {
+      MetaDataTransfer transfer = new MetaDataTransfer();
+      transfer.doTransfer();
+    } catch (MetadataException | IOException | ExecutionException | 
InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void doTransfer() throws IOException, ExecutionException, 
InterruptedException {
+    File failedFile = new File(failedMLogPath);
+    if (failedFile.exists()) {
+      failedFile.delete();
+    }
+
+    mLogWriter = new MLogWriter(failedMLogPath);
+    mLogWriter.setLogNum(0);
+
+    String schemaDir = 
IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
+    File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!schemaFolder.exists()) {
+      if (schemaFolder.mkdirs()) {
+        logger.info("create system folder {}", schemaFolder.getAbsolutePath());
+      } else {
+        logger.info("create system folder {} failed.", 
schemaFolder.getAbsolutePath());
+      }
+    }
+
+    mtreeSnapshotPath = schemaDir + File.separator + 
MetadataConstant.MTREE_SNAPSHOT;
+    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
+    if (mtreeSnapshot.exists()) {
+      try {
+        doTransferFromSnapshot();
+      } catch (MetadataException e) {
+        logger.error("Fatal error, terminate data transfer!!!", e);
+      }
+    }
+
+    String logFilePath = schemaDir + File.separator + 
MetadataConstant.METADATA_LOG;
+    File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // init the metadata from the operation log
+    if (logFile.exists()) {
+      try (MLogReader mLogReader = new MLogReader(schemaDir, 
MetadataConstant.METADATA_LOG)) {
+        int startIdx = 0;
+        File idxFile = new File(idxFilePath);
+        if (idxFile.exists()) {
+          try (BufferedReader br = new BufferedReader(new 
FileReader(idxFile))) {
+            String idxStr = br.readLine();
+            if (StringUtils.isNotEmpty(idxStr)) {
+              startIdx = Integer.valueOf(idxStr);
+            }
+          }
+        }
+        transferFromMLog(mLogReader, startIdx);
+      } catch (Exception e) {
+        throw new IOException("Failed to parser mlog.bin for err:" + e);
+      }
+    } else {
+      logger.info("No mlog.bin file find, skip data transfer");
+    }
+    mLogWriter.close();
+
+    logger.info("Transfer metadata from MManager to MRocksDBManager 
complete!");
+  }
+
+  private void transferFromMLog(MLogReader mLogReader, long startIdx)
+      throws IOException, MetadataException, ExecutionException, 
InterruptedException {
+    long time = System.currentTimeMillis();
+    logger.info("start from {} to transfer data from mlog.bin", startIdx);
+    int currentIdx = 0;
+    PhysicalPlan plan;
+    List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
+    while (mLogReader.hasNext()) {
+      try {
+        plan = mLogReader.next();
+        currentIdx++;
+        if (currentIdx <= startIdx) {
+          continue;
+        }
+      } catch (Exception e) {
+        logger.error("Parse mlog error at lineNumber {} because:", currentIdx, 
e);
+        throw e;
+      }
+      if (plan == null) {
+        continue;
+      }
+
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+        case CREATE_ALIGNED_TIMESERIES:
+        case AUTO_CREATE_DEVICE_MNODE:
+          nonCollisionCollections.add(plan);
+          if (nonCollisionCollections.size() > 
DEFAULT_TRANSFER_PLANS_BUFFER_SIZE) {
+            executeBufferedOperation(nonCollisionCollections);
+          }
+          break;
+        case SET_STORAGE_GROUP:
+        case TTL:
+        case CHANGE_ALIAS:
+        case DELETE_TIMESERIES:
+          executeBufferedOperation(nonCollisionCollections);
+          try {
+            rocksDBManager.operation(plan);
+          } catch (IOException e) {
+            rocksDBManager.operation(plan);

Review comment:
       add a retry log.

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class takes the responsibility of serialization of all the metadata 
info and persistent it
+ * into files. This class contains all the interfaces to modify the metadata 
for delta system. All
+ * the operations will be insert into the logs temporary in case the downtime 
of the delta system.
+ *
+ * <p>Since there are too many interfaces and methods in this class, we use 
code region to help
+ * manage code. The code region starts with //region and ends with 
//endregion. When using Intellij
+ * Idea to develop, it's easy to fold the code region and see code region 
overview by collapsing
+ * all.
+ *
+ * <p>The codes are divided into the following code regions:
+ *
+ * <ol>
+ *   <li>MManager Singleton
+ *   <li>Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+ *   <li>Interfaces for CQ
+ *   <li>Interfaces and Implementation for Timeseries operation
+ *   <li>Interfaces and Implementation for StorageGroup and TTL operation
+ *   <li>Interfaces for get and auto create device
+ *   <li>Interfaces for metadata info Query
+ *       <ol>
+ *         <li>Interfaces for metadata count
+ *         <li>Interfaces for level Node info Query
+ *         <li>Interfaces for StorageGroup and TTL info Query
+ *         <li>Interfaces for Entity/Device info Query
+ *         <li>Interfaces for timeseries, measurement and schema info Query
+ *       </ol>
+ *   <li>Interfaces and methods for MNode query
+ *   <li>Interfaces for alias and tag/attribute operations
+ *   <li>Interfaces only for Cluster module usage
+ *   <li>Interfaces for lastCache operations
+ *   <li>Interfaces and Implementation for InsertPlan process
+ *   <li>Interfaces and Implementation for Template operations
+ *   <li>TestOnly Interfaces
+ * </ol>
+ */
+public interface IMetaManager {
+
+  // region Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+  void init();
+
+  void createMTreeSnapshot();
+
+  void clear();
+
+  void operation(PhysicalPlan plan) throws IOException, MetadataException;
+  // endregion
+
+  // region Interfaces for CQ
+  void createContinuousQuery(CreateContinuousQueryPlan plan) throws 
MetadataException;
+
+  void dropContinuousQuery(DropContinuousQueryPlan plan) throws 
MetadataException;
+
+  void writeCreateContinuousQueryLog(CreateContinuousQueryPlan plan) throws 
IOException;
+
+  void writeDropContinuousQueryLog(DropContinuousQueryPlan plan) throws 
IOException;
+  // endregion
+
+  // region Interfaces and Implementation for Timeseries operation
+  // including create and delete
+  void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException;
+
+  void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props)
+      throws MetadataException;
+
+  void createAlignedTimeSeries(
+      PartialPath prefixPath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors)
+      throws MetadataException;
+
+  void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws 
MetadataException;
+
+  String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch) 
throws MetadataException;
+
+  String deleteTimeseries(PartialPath pathPattern) throws MetadataException;
+  // endregion
+
+  // region Interfaces and Implementation for StorageGroup and TTL operation
+  // including sg set and delete, and ttl set
+  void setStorageGroup(PartialPath storageGroup) throws MetadataException;
+
+  void deleteStorageGroups(List<PartialPath> storageGroups) throws 
MetadataException;
+
+  void setTTL(PartialPath storageGroup, long dataTTL) throws 
MetadataException, IOException;
+  // endregion
+
+  // region Interfaces for get and auto create device
+  // endregion

Review comment:
       delete

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
##########
@@ -0,0 +1,2551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MNodeTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.IMetaManager;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import com.google.common.collect.MapMaker;
+import io.netty.util.internal.StringUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.Holder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ALL_NODE_TYPE_ARRAY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+/**
+ * This class takes the responsibility of serialization of all the metadata 
info and persistent it
+ * into files. This class contains all the interfaces to modify the metadata 
for delta system. All
+ * the operations will be insert into the logs temporary in case the downtime 
of the delta system.
+ *
+ * <p>Since there are too many interfaces and methods in this class, we use 
code region to help
+ * manage code. The code region starts with //region and ends with 
//endregion. When using Intellij
+ * Idea to develop, it's easy to fold the code region and see code region 
overview by collapsing
+ * all.
+ *
+ * <p>The codes are divided into the following code regions:
+ *
+ * <ol>
+ *   <li>MManager Singleton
+ *   <li>Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+ *   <li>Interfaces for CQ
+ *   <li>Interfaces and Implementation for Timeseries operation
+ *   <li>Interfaces and Implementation for StorageGroup and TTL operation
+ *   <li>Interfaces for get and auto create device
+ *   <li>Interfaces for metadata info Query
+ *       <ol>
+ *         <li>Interfaces for metadata count
+ *         <li>Interfaces for level Node info Query
+ *         <li>Interfaces for StorageGroup and TTL info Query
+ *         <li>Interfaces for Entity/Device info Query
+ *         <li>Interfaces for timeseries, measurement and schema info Query
+ *       </ol>
+ *   <li>Interfaces and methods for MNode query
+ *   <li>Interfaces for alias and tag/attribute operations
+ *   <li>Interfaces only for Cluster module usage
+ *   <li>Interfaces for lastCache operations
+ *   <li>Interfaces and Implementation for InsertPlan process
+ *   <li>Interfaces and Implementation for Template operations
+ *   <li>TestOnly Interfaces
+ * </ol>
+ */
+public class MRocksDBManager implements IMetaManager {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MRocksDBManager.class);
+
+  protected static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  // TODO: make it configurable
+  public static int MAX_PATH_DEPTH = 10;
+
+  private static final long MAX_LOCK_WAIT_TIME = 50;
+
+  private RocksDBReadWriteHandler readWriteHandler;
+
+  private final Map<String, ReentrantLock> locksPool =
+      new MapMaker().weakValues().initialCapacity(10000).makeMap();
+
+  private volatile Map<String, Boolean> storageGroupDeletingFlagMap = new 
ConcurrentHashMap<>();

Review comment:
       The ConcurrentHashMap implementation ensures visibility and does not 
need to use volatile.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/qp/executor/PlanExecutor.java
##########
@@ -187,41 +187,11 @@
 import java.util.concurrent.Future;
 import java.util.concurrent.ThreadPoolExecutor;
 
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_NODES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CHILD_PATHS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COLUMN;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_BOUNDARY;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_EVERY_INTERVAL;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_FOR_INTERVAL;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_NAME;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_QUERY_SQL;
-import static 
org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_CONTINUOUS_QUERY_TARGET_PATH;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_COUNT;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_DEVICES;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_CLASS;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_NAME;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_FUNCTION_TYPE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ITEM;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_LOCK_INFO;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_PRIVILEGE;
-import static org.apache.iotdb.db.conf.IoTDBConstant.COLUMN_ROLE;
+import static org.apache.iotdb.db.conf.IoTDBConstant.*;

Review comment:
       import *

##########
File path: 
cluster/src/test/java/org/apache/iotdb/cluster/server/member/MetaGroupMemberTest.java
##########
@@ -141,16 +140,9 @@
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 
-import static org.apache.iotdb.cluster.server.NodeCharacter.ELECTOR;
-import static org.apache.iotdb.cluster.server.NodeCharacter.FOLLOWER;
-import static org.apache.iotdb.cluster.server.NodeCharacter.LEADER;
+import static org.apache.iotdb.cluster.server.NodeCharacter.*;

Review comment:
       does not import *

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/RocksDBReadWriteHandler.java
##########
@@ -0,0 +1,570 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.MeasurementMNode;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import com.google.common.primitives.Bytes;
+import org.rocksdb.ColumnFamilyDescriptor;
+import org.rocksdb.ColumnFamilyHandle;
+import org.rocksdb.ColumnFamilyOptions;
+import org.rocksdb.DBOptions;
+import org.rocksdb.Holder;
+import org.rocksdb.InfoLogLevel;
+import org.rocksdb.LRUCache;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_ORIGIN_KEY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_BLOCK_TYPE_SCHEMA;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DATA_VERSION;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_FLAG;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ROOT;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.PATH_SEPARATOR;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ROOT;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+
+public class RocksDBReadWriteHandler {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(RocksDBReadWriteHandler.class);
+
+  protected static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  private static final String ROCKSDB_FOLDER = "rocksdb-schema";
+
+  private static final String[] INNER_TABLES =
+      new String[] {new String(RocksDB.DEFAULT_COLUMN_FAMILY), 
TABLE_NAME_TAGS};
+
+  public static final String ROCKSDB_PATH = config.getSystemDir() + 
File.separator + ROCKSDB_FOLDER;
+
+  private RocksDB rocksDB;
+
+  private static volatile RocksDBReadWriteHandler readWriteHandler;
+
+  ConcurrentMap<String, ColumnFamilyHandle> columnFamilyHandleMap = new 
ConcurrentHashMap<>();
+  List<ColumnFamilyDescriptor> columnFamilyDescriptors = new ArrayList<>();
+  List<ColumnFamilyHandle> columnFamilyHandles = new ArrayList<>();
+
+  static {
+    RocksDB.loadLibrary();
+  }
+
+  public RocksDBReadWriteHandler() throws RocksDBException {
+    Options options = new Options();
+    options.setCreateIfMissing(true);
+    options.setAllowMmapReads(true);
+    options.setRowCache(new LRUCache(9000000));
+    options.setDbWriteBufferSize(16 * 1024 * 1024);
+
+    org.rocksdb.Logger rocksDBLogger = new RockDBLogger(options, logger);
+    rocksDBLogger.setInfoLogLevel(InfoLogLevel.ERROR_LEVEL);
+    options.setLogger(rocksDBLogger);
+
+    DBOptions dbOptions = new DBOptions(options);
+
+    initColumnFamilyDescriptors(options);
+
+    rocksDB = RocksDB.open(dbOptions, ROCKSDB_PATH, columnFamilyDescriptors, 
columnFamilyHandles);
+
+    initInnerColumnFamilies();
+
+    initRootKey();
+  }
+
+  private void initColumnFamilyDescriptors(Options options) throws 
RocksDBException {
+    List<byte[]> cfs = RocksDB.listColumnFamilies(options, ROCKSDB_PATH);
+    if (cfs == null || cfs.size() <= 0) {
+      cfs = new ArrayList<>();
+      cfs.add(RocksDB.DEFAULT_COLUMN_FAMILY);
+    }
+
+    for (byte[] tableBytes : cfs) {
+      columnFamilyDescriptors.add(
+          new ColumnFamilyDescriptor(tableBytes, new ColumnFamilyOptions()));
+    }
+  }
+
+  private void initInnerColumnFamilies() throws RocksDBException {
+    for (String tableNames : INNER_TABLES) {
+      boolean tableCreated = false;
+      for (ColumnFamilyHandle cfh : columnFamilyHandles) {
+        if (tableNames.equals(new String(cfh.getName()))) {
+          tableCreated = true;
+          break;
+        }
+      }
+      if (!tableCreated) {
+        createTable(tableNames);
+      }
+    }
+    for (ColumnFamilyHandle handle : columnFamilyHandles) {
+      columnFamilyHandleMap.put(new String(handle.getName()), handle);
+    }
+  }
+
+  private void initRootKey() throws RocksDBException {
+    byte[] rootKey = RocksDBUtils.toRocksDBKey(ROOT, NODE_TYPE_ROOT);
+    if (!keyExist(rootKey)) {
+      rocksDB.put(rootKey, new byte[] {DATA_VERSION, DEFAULT_FLAG});
+    }
+  }
+
+  private void createTable(String tableName) throws RocksDBException {
+    ColumnFamilyHandle columnFamilyHandle =
+        rocksDB.createColumnFamily(
+            new ColumnFamilyDescriptor(tableName.getBytes(), new 
ColumnFamilyOptions()));
+    columnFamilyDescriptors.add(
+        new ColumnFamilyDescriptor(tableName.getBytes(), new 
ColumnFamilyOptions()));
+    columnFamilyHandles.add(columnFamilyHandle);
+  }
+
+  public ColumnFamilyHandle getCFHByName(String columnFamilyName) {
+    return columnFamilyHandleMap.get(columnFamilyName);
+  }
+
+  public void updateNode(byte[] key, byte[] value) throws RocksDBException {
+    rocksDB.put(key, value);
+  }
+
+  public void createNode(String levelKey, RocksDBMNodeType type, byte[] value)
+      throws RocksDBException {
+    byte[] nodeKey = RocksDBUtils.toRocksDBKey(levelKey, type.value);
+    rocksDB.put(nodeKey, value);
+  }
+
+  public void createNode(byte[] nodeKey, byte[] value) throws RocksDBException 
{
+    rocksDB.put(nodeKey, value);
+  }
+
+  public void convertToEntityNode(String levelPath, byte[] value) throws 
RocksDBException {
+    WriteBatch batch = new WriteBatch();
+    byte[] internalKey = RocksDBUtils.toInternalNodeKey(levelPath);
+    byte[] entityKey = RocksDBUtils.toEntityNodeKey(levelPath);
+    batch.delete(internalKey);
+    batch.put(entityKey, value);
+    executeBatch(batch);
+  }
+
+  public IMeasurementMNode getMeasurementMNode(PartialPath fullPath) throws 
MetadataException {
+    String[] nodes = fullPath.getNodes();
+    String key = RocksDBUtils.getLevelPath(nodes, nodes.length - 1);
+    try {
+      byte[] value = rocksDB.get(key.getBytes());
+      if (value == null) {
+        logger.warn("path not exist: {}", key);
+        throw new MetadataException("key not exist");
+      }
+      IMeasurementMNode node = new MeasurementMNode(null, 
fullPath.getFullPath(), null, null);
+      return node;
+    } catch (RocksDBException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  public long countNodesNum(String tableName) {
+    ColumnFamilyHandle columnFamilyHandle = 
columnFamilyHandleMap.get(tableName);
+    RocksIterator iter = rocksDB.newIterator(columnFamilyHandle);
+    long count = 0;
+    for (iter.seekToFirst(); iter.isValid(); iter.next()) {
+      count++;
+    }
+    return count;
+  }
+
+  public boolean keyExistByType(String levelKey, RocksDBMNodeType type) throws 
RocksDBException {
+    return keyExistByType(levelKey, type, new Holder<>());
+  }
+
+  public boolean keyExistByType(String levelKey, RocksDBMNodeType type, 
Holder<byte[]> holder)
+      throws RocksDBException {
+    byte[] key = RocksDBUtils.toRocksDBKey(levelKey, type.value);
+    return keyExist(key, holder);
+  }
+
+  public CheckKeyResult keyExistByAllTypes(String levelKey) throws 
RocksDBException {
+    RocksDBMNodeType[] types =
+        new RocksDBMNodeType[] {
+          RocksDBMNodeType.ALISA,
+          RocksDBMNodeType.ENTITY,
+          RocksDBMNodeType.INTERNAL,
+          RocksDBMNodeType.MEASUREMENT,
+          RocksDBMNodeType.STORAGE_GROUP
+        };
+    return keyExistByTypes(levelKey, types);
+  }
+
+  public CheckKeyResult keyExistByTypes(String levelKey, RocksDBMNodeType... 
types)
+      throws RocksDBException {
+    CheckKeyResult result = new CheckKeyResult();
+    try {
+      Arrays.stream(types)
+          //          .parallel()
+          .forEach(
+              x -> {
+                byte[] key = Bytes.concat(new byte[] {(byte) x.value}, 
levelKey.getBytes());
+                try {
+                  Holder<byte[]> holder = new Holder<>();
+                  boolean keyExisted = keyExist(key, holder);
+                  if (keyExisted) {
+                    result.setExistType(x.value);
+                    result.setValue(holder.getValue());
+                  }
+                } catch (RocksDBException e) {
+                  throw new RuntimeException(e);
+                }
+              });
+    } catch (Exception e) {
+      if (e.getCause() instanceof RocksDBException) {
+        throw (RocksDBException) e.getCause();
+      }
+      throw e;
+    }
+    return result;
+  }
+
+  public boolean keyExist(byte[] key, Holder<byte[]> holder) throws 
RocksDBException {
+    boolean exist = false;
+    if (!rocksDB.keyMayExist(key, holder)) {
+      exist = false;
+    } else {
+      if (holder.getValue() == null) {
+        byte[] value = rocksDB.get(key);
+        if (value != null) {
+          exist = true;
+          holder.setValue(value);
+        }
+      } else {
+        exist = true;
+      }
+    }
+    return exist;
+  }
+
+  public boolean keyExist(byte[] key) throws RocksDBException {
+    return keyExist(key, new Holder<>());
+  }
+
+  public boolean keyExist(String key, Holder<byte[]> holder) throws 
RocksDBException {
+    return keyExist(key.getBytes(), holder);
+  }
+
+  public boolean keyExist(String key) throws RocksDBException {
+    return keyExist(key, new Holder<>());
+  }
+
+  public void scanAllKeysRecursively(Set<String> seeds, int level, 
Function<String, Boolean> op) {
+    if (seeds == null || seeds.isEmpty()) {
+      return;
+    }
+    Set<String> children = ConcurrentHashMap.newKeySet();
+    seeds
+        .parallelStream()
+        .forEach(
+            x -> {
+              if (op.apply(x)) {
+                // x is not leaf node
+                String childrenPrefix = RocksDBUtils.getNextLevelOfPath(x, 
level);
+                children.addAll(getAllByPrefix(childrenPrefix));
+              }
+            });
+    if (!children.isEmpty()) {
+      scanAllKeysRecursively(children, level + 1, op);
+    }
+  }
+
+  public Set<String> getAllByPrefix(String prefix) {
+    Set<String> result = new HashSet<>();
+    byte[] prefixKey = prefix.getBytes();
+    RocksIterator iterator = rocksDB.newIterator();
+    for (iterator.seek(prefixKey); iterator.isValid(); iterator.next()) {
+      String key = new String(iterator.key());
+      if (!key.startsWith(prefix)) {
+        break;
+      }
+      result.add(key);
+    }
+    //    System.out.println(prefix + " " + result.size());

Review comment:
       delete log

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTree;
+import 
org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MetaDataTransfer {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MetaDataTransfer.class);
+
+  private static int DEFAULT_TRANSFER_THREAD_POOL_SIZE = 200;
+  private static int DEFAULT_TRANSFER_PLANS_BUFFER_SIZE = 100_000;
+
+  private ForkJoinPool forkJoinPool = new 
ForkJoinPool(DEFAULT_TRANSFER_THREAD_POOL_SIZE);
+
+  private String mtreeSnapshotPath;
+  private MRocksDBManager rocksDBManager;
+  private MLogWriter mLogWriter;
+  private String failedMLogPath =
+      IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
+          + File.separator
+          + MetadataConstant.METADATA_LOG
+          + ".transfer_failed";
+
+  private String idxFilePath =
+      RocksDBReadWriteHandler.ROCKSDB_PATH + File.separator + 
"transfer_mlog.idx";
+
+  private AtomicInteger failedPlanCount = new AtomicInteger(0);
+  private List<PhysicalPlan> retryPlans = new ArrayList<>();
+
+  MetaDataTransfer() throws MetadataException {
+    rocksDBManager = new MRocksDBManager();
+  }
+
+  public static void main(String[] args) {
+    try {
+      MetaDataTransfer transfer = new MetaDataTransfer();
+      transfer.doTransfer();
+    } catch (MetadataException | IOException | ExecutionException | 
InterruptedException e) {
+      e.printStackTrace();
+    }
+  }
+
+  public void doTransfer() throws IOException, ExecutionException, 
InterruptedException {
+    File failedFile = new File(failedMLogPath);
+    if (failedFile.exists()) {
+      failedFile.delete();
+    }
+
+    mLogWriter = new MLogWriter(failedMLogPath);
+    mLogWriter.setLogNum(0);
+
+    String schemaDir = 
IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
+    File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!schemaFolder.exists()) {
+      if (schemaFolder.mkdirs()) {
+        logger.info("create system folder {}", schemaFolder.getAbsolutePath());
+      } else {
+        logger.info("create system folder {} failed.", 
schemaFolder.getAbsolutePath());
+      }
+    }
+
+    mtreeSnapshotPath = schemaDir + File.separator + 
MetadataConstant.MTREE_SNAPSHOT;
+    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
+    if (mtreeSnapshot.exists()) {
+      try {
+        doTransferFromSnapshot();
+      } catch (MetadataException e) {
+        logger.error("Fatal error, terminate data transfer!!!", e);
+      }
+    }
+
+    String logFilePath = schemaDir + File.separator + 
MetadataConstant.METADATA_LOG;
+    File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // init the metadata from the operation log
+    if (logFile.exists()) {
+      try (MLogReader mLogReader = new MLogReader(schemaDir, 
MetadataConstant.METADATA_LOG)) {
+        int startIdx = 0;
+        File idxFile = new File(idxFilePath);
+        if (idxFile.exists()) {
+          try (BufferedReader br = new BufferedReader(new 
FileReader(idxFile))) {
+            String idxStr = br.readLine();
+            if (StringUtils.isNotEmpty(idxStr)) {
+              startIdx = Integer.valueOf(idxStr);
+            }
+          }
+        }
+        transferFromMLog(mLogReader, startIdx);
+      } catch (Exception e) {
+        throw new IOException("Failed to parser mlog.bin for err:" + e);
+      }
+    } else {
+      logger.info("No mlog.bin file find, skip data transfer");
+    }
+    mLogWriter.close();
+
+    logger.info("Transfer metadata from MManager to MRocksDBManager 
complete!");
+  }
+
+  private void transferFromMLog(MLogReader mLogReader, long startIdx)
+      throws IOException, MetadataException, ExecutionException, 
InterruptedException {
+    long time = System.currentTimeMillis();
+    logger.info("start from {} to transfer data from mlog.bin", startIdx);
+    int currentIdx = 0;
+    PhysicalPlan plan;
+    List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
+    while (mLogReader.hasNext()) {
+      try {
+        plan = mLogReader.next();
+        currentIdx++;
+        if (currentIdx <= startIdx) {
+          continue;
+        }
+      } catch (Exception e) {
+        logger.error("Parse mlog error at lineNumber {} because:", currentIdx, 
e);
+        throw e;
+      }
+      if (plan == null) {
+        continue;
+      }
+
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+        case CREATE_ALIGNED_TIMESERIES:
+        case AUTO_CREATE_DEVICE_MNODE:
+          nonCollisionCollections.add(plan);
+          if (nonCollisionCollections.size() > 
DEFAULT_TRANSFER_PLANS_BUFFER_SIZE) {
+            executeBufferedOperation(nonCollisionCollections);
+          }
+          break;
+        case SET_STORAGE_GROUP:
+        case TTL:
+        case CHANGE_ALIAS:
+        case DELETE_TIMESERIES:
+          executeBufferedOperation(nonCollisionCollections);
+          try {
+            rocksDBManager.operation(plan);
+          } catch (IOException e) {
+            rocksDBManager.operation(plan);
+          } catch (MetadataException e) {
+            logger.error("Can not operate cmd {} for err:", 
plan.getOperatorType(), e);
+          }
+          break;
+        case DELETE_STORAGE_GROUP:
+          DeleteStorageGroupPlan deleteStorageGroupPlan = 
(DeleteStorageGroupPlan) plan;
+          for (PartialPath path : deleteStorageGroupPlan.getPaths()) {
+            logger.info("delete storage group: {}", path.getFullPath());
+          }
+          break;
+        case CHANGE_TAG_OFFSET:
+        case CREATE_TEMPLATE:
+        case DROP_TEMPLATE:
+        case APPEND_TEMPLATE:
+        case PRUNE_TEMPLATE:
+        case SET_TEMPLATE:
+        case ACTIVATE_TEMPLATE:
+        case UNSET_TEMPLATE:
+        case CREATE_CONTINUOUS_QUERY:
+        case DROP_CONTINUOUS_QUERY:
+          logger.error("unsupported operations {}", plan.toString());
+          break;
+        default:
+          logger.error("Unrecognizable command {}", plan.getOperatorType());
+      }
+    }
+
+    executeBufferedOperation(nonCollisionCollections);
+
+    if (retryPlans.size() > 0) {
+      for (PhysicalPlan retryPlan : retryPlans) {
+        try {
+          rocksDBManager.operation(retryPlan);
+        } catch (IOException e) {
+          persistFailedLog(retryPlan);
+        } catch (MetadataException e) {
+          logger.error("Execute plan failed: {}", retryPlan.toString(), e);
+        } catch (Exception e) {
+          persistFailedLog(retryPlan);
+        }

Review comment:
       will IOException add Exception merged

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MetaDataTransfer.java
##########
@@ -0,0 +1,458 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTree;
+import 
org.apache.iotdb.db.metadata.mtree.traverser.collector.MeasurementCollector;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class MetaDataTransfer {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MetaDataTransfer.class);
+
+  private static int DEFAULT_TRANSFER_THREAD_POOL_SIZE = 200;
+  private static int DEFAULT_TRANSFER_PLANS_BUFFER_SIZE = 100_000;
+
+  private ForkJoinPool forkJoinPool = new 
ForkJoinPool(DEFAULT_TRANSFER_THREAD_POOL_SIZE);
+
+  private String mtreeSnapshotPath;
+  private MRocksDBManager rocksDBManager;
+  private MLogWriter mLogWriter;
+  private String failedMLogPath =
+      IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
+          + File.separator
+          + MetadataConstant.METADATA_LOG
+          + ".transfer_failed";
+
+  private String idxFilePath =
+      RocksDBReadWriteHandler.ROCKSDB_PATH + File.separator + 
"transfer_mlog.idx";
+
+  private AtomicInteger failedPlanCount = new AtomicInteger(0);
+  private List<PhysicalPlan> retryPlans = new ArrayList<>();
+
+  MetaDataTransfer() throws MetadataException {
+    rocksDBManager = new MRocksDBManager();
+  }
+
+  public static void main(String[] args) {
+    try {
+      MetaDataTransfer transfer = new MetaDataTransfer();
+      transfer.doTransfer();
+    } catch (MetadataException | IOException | ExecutionException | 
InterruptedException e) {
+      e.printStackTrace();

Review comment:
       please use log print ...

##########
File path: server/src/main/java/org/apache/iotdb/db/metadata/IMetaManager.java
##########
@@ -0,0 +1,383 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata;
+
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class takes the responsibility of serialization of all the metadata 
info and persistent it
+ * into files. This class contains all the interfaces to modify the metadata 
for delta system. All
+ * the operations will be insert into the logs temporary in case the downtime 
of the delta system.
+ *
+ * <p>Since there are too many interfaces and methods in this class, we use 
code region to help
+ * manage code. The code region starts with //region and ends with 
//endregion. When using Intellij
+ * Idea to develop, it's easy to fold the code region and see code region 
overview by collapsing
+ * all.
+ *
+ * <p>The codes are divided into the following code regions:
+ *
+ * <ol>
+ *   <li>MManager Singleton
+ *   <li>Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+ *   <li>Interfaces for CQ
+ *   <li>Interfaces and Implementation for Timeseries operation
+ *   <li>Interfaces and Implementation for StorageGroup and TTL operation
+ *   <li>Interfaces for get and auto create device
+ *   <li>Interfaces for metadata info Query
+ *       <ol>
+ *         <li>Interfaces for metadata count
+ *         <li>Interfaces for level Node info Query
+ *         <li>Interfaces for StorageGroup and TTL info Query
+ *         <li>Interfaces for Entity/Device info Query
+ *         <li>Interfaces for timeseries, measurement and schema info Query
+ *       </ol>
+ *   <li>Interfaces and methods for MNode query
+ *   <li>Interfaces for alias and tag/attribute operations
+ *   <li>Interfaces only for Cluster module usage
+ *   <li>Interfaces for lastCache operations
+ *   <li>Interfaces and Implementation for InsertPlan process
+ *   <li>Interfaces and Implementation for Template operations
+ *   <li>TestOnly Interfaces
+ * </ol>
+ */
+public interface IMetaManager {
+
+  // region Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+  void init();
+
+  void createMTreeSnapshot();
+
+  void clear();
+
+  void operation(PhysicalPlan plan) throws IOException, MetadataException;
+  // endregion
+
+  // region Interfaces for CQ
+  void createContinuousQuery(CreateContinuousQueryPlan plan) throws 
MetadataException;
+
+  void dropContinuousQuery(DropContinuousQueryPlan plan) throws 
MetadataException;
+
+  void writeCreateContinuousQueryLog(CreateContinuousQueryPlan plan) throws 
IOException;
+
+  void writeDropContinuousQueryLog(DropContinuousQueryPlan plan) throws 
IOException;
+  // endregion
+
+  // region Interfaces and Implementation for Timeseries operation
+  // including create and delete
+  void createTimeseries(CreateTimeSeriesPlan plan) throws MetadataException;
+
+  void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props)
+      throws MetadataException;
+
+  void createAlignedTimeSeries(
+      PartialPath prefixPath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors)
+      throws MetadataException;
+
+  void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws 
MetadataException;
+
+  String deleteTimeseries(PartialPath pathPattern, boolean isPrefixMatch) 
throws MetadataException;
+
+  String deleteTimeseries(PartialPath pathPattern) throws MetadataException;
+  // endregion

Review comment:
       delete

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/rocksdb/MRocksDBManager.java
##########
@@ -0,0 +1,2551 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBConfig;
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.AliasAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.AlignedTimeseriesException;
+import org.apache.iotdb.db.exception.metadata.DataTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.IllegalPathException;
+import org.apache.iotdb.db.exception.metadata.MNodeTypeMismatchException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.exception.metadata.PathAlreadyExistException;
+import org.apache.iotdb.db.exception.metadata.PathNotExistException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupAlreadySetException;
+import org.apache.iotdb.db.exception.metadata.StorageGroupNotSetException;
+import org.apache.iotdb.db.metadata.IMetaManager;
+import org.apache.iotdb.db.metadata.MManager;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.REntityMNode;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.RMeasurementMNode;
+import org.apache.iotdb.db.metadata.rocksdb.mnode.RStorageGroupMNode;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.db.metadata.utils.MetaFormatUtils;
+import org.apache.iotdb.db.metadata.utils.MetaUtils;
+import org.apache.iotdb.db.qp.constant.SQLConstant;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
+import org.apache.iotdb.db.qp.physical.crud.InsertTabletPlan;
+import org.apache.iotdb.db.qp.physical.sys.ActivateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AppendTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropContinuousQueryPlan;
+import org.apache.iotdb.db.qp.physical.sys.DropTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.PruneTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTemplatePlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowDevicesPlan;
+import org.apache.iotdb.db.qp.physical.sys.ShowTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.UnsetTemplatePlan;
+import org.apache.iotdb.db.query.context.QueryContext;
+import org.apache.iotdb.db.query.dataset.ShowDevicesResult;
+import org.apache.iotdb.db.query.dataset.ShowTimeSeriesResult;
+import org.apache.iotdb.db.service.IoTDB;
+import org.apache.iotdb.db.utils.SchemaUtils;
+import org.apache.iotdb.db.utils.TestOnly;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.iotdb.tsfile.read.TimeValuePair;
+import org.apache.iotdb.tsfile.utils.Pair;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.TimeseriesSchema;
+
+import com.google.common.collect.MapMaker;
+import io.netty.util.internal.StringUtil;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.rocksdb.Holder;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Queue;
+import java.util.Set;
+import java.util.Stack;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.function.BiFunction;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+
+import static org.apache.iotdb.db.conf.IoTDBConstant.MULTI_LEVEL_PATH_WILDCARD;
+import static org.apache.iotdb.db.conf.IoTDBConstant.ONE_LEVEL_PATH_WILDCARD;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ALL_NODE_TYPE_ARRAY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_ALIGNED_ENTITY_VALUE;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.DEFAULT_NODE_VALUE;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.FLAG_IS_ALIGNED;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_ENTITY;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_MEASUREMENT;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.NODE_TYPE_SG;
+import static 
org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.TABLE_NAME_TAGS;
+import static org.apache.iotdb.db.metadata.rocksdb.RockDBConstants.ZERO;
+import static 
org.apache.iotdb.tsfile.common.constant.TsFileConstant.PATH_SEPARATOR;
+
+/**
+ * This class takes the responsibility of serialization of all the metadata 
info and persistent it
+ * into files. This class contains all the interfaces to modify the metadata 
for delta system. All
+ * the operations will be insert into the logs temporary in case the downtime 
of the delta system.
+ *
+ * <p>Since there are too many interfaces and methods in this class, we use 
code region to help
+ * manage code. The code region starts with //region and ends with 
//endregion. When using Intellij
+ * Idea to develop, it's easy to fold the code region and see code region 
overview by collapsing
+ * all.
+ *
+ * <p>The codes are divided into the following code regions:
+ *
+ * <ol>
+ *   <li>MManager Singleton
+ *   <li>Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+ *   <li>Interfaces for CQ
+ *   <li>Interfaces and Implementation for Timeseries operation
+ *   <li>Interfaces and Implementation for StorageGroup and TTL operation
+ *   <li>Interfaces for get and auto create device
+ *   <li>Interfaces for metadata info Query
+ *       <ol>
+ *         <li>Interfaces for metadata count
+ *         <li>Interfaces for level Node info Query
+ *         <li>Interfaces for StorageGroup and TTL info Query
+ *         <li>Interfaces for Entity/Device info Query
+ *         <li>Interfaces for timeseries, measurement and schema info Query
+ *       </ol>
+ *   <li>Interfaces and methods for MNode query
+ *   <li>Interfaces for alias and tag/attribute operations
+ *   <li>Interfaces only for Cluster module usage
+ *   <li>Interfaces for lastCache operations
+ *   <li>Interfaces and Implementation for InsertPlan process
+ *   <li>Interfaces and Implementation for Template operations
+ *   <li>TestOnly Interfaces
+ * </ol>
+ */
+public class MRocksDBManager implements IMetaManager {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(MRocksDBManager.class);
+
+  protected static IoTDBConfig config = 
IoTDBDescriptor.getInstance().getConfig();
+
+  // TODO: make it configurable
+  public static int MAX_PATH_DEPTH = 10;
+
+  private static final long MAX_LOCK_WAIT_TIME = 50;
+
+  private RocksDBReadWriteHandler readWriteHandler;
+
+  private final Map<String, ReentrantLock> locksPool =
+      new MapMaker().weakValues().initialCapacity(10000).makeMap();
+
+  private volatile Map<String, Boolean> storageGroupDeletingFlagMap = new 
ConcurrentHashMap<>();
+
+  public MRocksDBManager() throws MetadataException {
+    try {
+      readWriteHandler = RocksDBReadWriteHandler.getInstance();
+    } catch (RocksDBException e) {
+      logger.error("create RocksDBReadWriteHandler fail", e);
+      throw new MetadataException(e);
+    }
+  }
+
+  // region Interfaces and Implementation of MManager 
initialization、snapshot、recover and clear
+  @Override
+  public void init() {}
+
+  @Override
+  public void createMTreeSnapshot() {
+    throw new UnsupportedOperationException(
+        "RocksDB based MetaData Manager doesn't support the operation");
+  }
+
+  @TestOnly
+  @Override
+  public void clear() {}
+
+  @Override
+  public void operation(PhysicalPlan plan) throws IOException, 
MetadataException {
+    switch (plan.getOperatorType()) {
+      case CREATE_TIMESERIES:
+        CreateTimeSeriesPlan createTimeSeriesPlan = (CreateTimeSeriesPlan) 
plan;
+        createTimeseries(createTimeSeriesPlan);
+        break;
+      case CREATE_ALIGNED_TIMESERIES:
+        CreateAlignedTimeSeriesPlan createAlignedTimeSeriesPlan =
+            (CreateAlignedTimeSeriesPlan) plan;
+        createAlignedTimeSeries(createAlignedTimeSeriesPlan);
+        break;
+      case DELETE_TIMESERIES:
+        DeleteTimeSeriesPlan deleteTimeSeriesPlan = (DeleteTimeSeriesPlan) 
plan;
+        // cause we only has one path for one DeleteTimeSeriesPlan
+        deleteTimeseries(deleteTimeSeriesPlan.getPaths().get(0));
+        break;
+      case SET_STORAGE_GROUP:
+        SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
+        setStorageGroup(setStorageGroupPlan.getPath());
+        break;
+      case DELETE_STORAGE_GROUP:
+        DeleteStorageGroupPlan deleteStorageGroupPlan = 
(DeleteStorageGroupPlan) plan;
+        deleteStorageGroups(deleteStorageGroupPlan.getPaths());
+        break;
+      case TTL:
+        SetTTLPlan setTTLPlan = (SetTTLPlan) plan;
+        setTTL(setTTLPlan.getStorageGroup(), setTTLPlan.getDataTTL());
+        break;
+      case CHANGE_ALIAS:
+        ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
+        changeAlias(changeAliasPlan.getPath(), changeAliasPlan.getAlias());
+        break;
+      case AUTO_CREATE_DEVICE_MNODE:
+        AutoCreateDeviceMNodePlan autoCreateDeviceMNodePlan = 
(AutoCreateDeviceMNodePlan) plan;
+        autoCreateDeviceMNode(autoCreateDeviceMNodePlan);
+        break;
+      case CHANGE_TAG_OFFSET:
+      case CREATE_TEMPLATE:
+      case DROP_TEMPLATE:
+      case APPEND_TEMPLATE:
+      case PRUNE_TEMPLATE:
+      case SET_TEMPLATE:
+      case ACTIVATE_TEMPLATE:
+      case UNSET_TEMPLATE:
+      case CREATE_CONTINUOUS_QUERY:
+      case DROP_CONTINUOUS_QUERY:
+        logger.error("unsupported operations {}", plan.toString());
+        break;
+      default:
+        logger.error("Unrecognizable command {}", plan.getOperatorType());
+    }
+  }
+  // endregion
+
+  // region Interfaces for CQ
+  @Override
+  public void createContinuousQuery(CreateContinuousQueryPlan plan) throws 
MetadataException {}
+
+  @Override
+  public void dropContinuousQuery(DropContinuousQueryPlan plan) throws 
MetadataException {}
+
+  @Override
+  public void writeCreateContinuousQueryLog(CreateContinuousQueryPlan plan) 
throws IOException {}
+
+  @Override
+  public void writeDropContinuousQueryLog(DropContinuousQueryPlan plan) throws 
IOException {}
+  // endregion
+
+  // region Interfaces and Implementation for Timeseries operation
+  // including create and delete
+  @Override
+  public void createTimeseries(CreateTimeSeriesPlan plan) throws 
MetadataException {
+    createTimeSeries(
+        plan.getPath(),
+        new MeasurementSchema(
+            plan.getPath().getMeasurement(),
+            plan.getDataType(),
+            plan.getEncoding(),
+            plan.getCompressor(),
+            plan.getProps()),
+        plan.getAlias(),
+        plan.getTags(),
+        plan.getAttributes());
+  }
+
+  /**
+   * Add one timeseries to metadata tree, if the timeseries already exists, 
throw exception
+   *
+   * @param path the timeseries path
+   * @param dataType the dateType {@code DataType} of the timeseries
+   * @param encoding the encoding function {@code Encoding} of the timeseries
+   * @param compressor the compressor function {@code Compressor} of the time 
series
+   */
+  @Override
+  public void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props)
+      throws MetadataException {
+    createTimeseries(path, dataType, encoding, compressor, props, null);
+  }
+
+  /**
+   * Add one timeseries to metadata, if the timeseries already exists, throw 
exception
+   *
+   * @param path the timeseries path
+   * @param dataType the dateType {@code DataType} of the timeseries
+   * @param encoding the encoding function {@code Encoding} of the timeseries
+   * @param compressor the compressor function {@code Compressor} of the time 
series
+   */
+  public void createTimeseries(
+      PartialPath path,
+      TSDataType dataType,
+      TSEncoding encoding,
+      CompressionType compressor,
+      Map<String, String> props,
+      String alias)
+      throws MetadataException {
+    createTimeSeries(
+        path,
+        new MeasurementSchema(path.getMeasurement(), dataType, encoding, 
compressor, props),
+        alias,
+        null,
+        null);
+  }
+
+  protected void createTimeSeries(
+      PartialPath path,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws MetadataException {
+    // regular check
+    if (path.getNodes().length > MRocksDBManager.MAX_PATH_DEPTH) {
+      throw new IllegalPathException(
+          String.format(
+              "path is too long, provide: %d, max: %d",
+              path.getNodeLength(), MRocksDBManager.MAX_PATH_DEPTH));
+    }
+    MetaFormatUtils.checkTimeseries(path);
+    MetaFormatUtils.checkTimeseriesProps(path.getFullPath(), 
schema.getProps());
+
+    // sg check and create
+    String[] nodes = path.getNodes();
+    SchemaUtils.checkDataTypeWithEncoding(schema.getType(), 
schema.getEncodingType());
+    int sgIndex = ensureStorageGroup(path);
+
+    try {
+      createTimeSeriesRecursively(
+          nodes, nodes.length, sgIndex, schema, alias, tags, attributes, new 
Stack<>());
+      // TODO: load tags to memory
+    } catch (RocksDBException | InterruptedException | IOException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  private void createTimeSeriesRecursively(
+      String nodes[],
+      int start,
+      int end,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes,
+      Stack<Lock> lockedLocks)
+      throws InterruptedException, MetadataException, RocksDBException, 
IOException {
+    if (start <= end) {
+      // nodes "root" must exist and don't need to check
+      return;
+    }
+    String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+    if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+      lockedLocks.push(lock);
+      try {
+        CheckKeyResult checkResult = 
readWriteHandler.keyExistByAllTypes(levelPath);
+        if (!checkResult.existAnyKey()) {
+          createTimeSeriesRecursively(
+              nodes, start - 1, end, schema, alias, tags, attributes, 
lockedLocks);
+          if (start == nodes.length) {
+            createTimeSeriesNode(nodes, levelPath, schema, alias, tags, 
attributes);
+          } else if (start == nodes.length - 1) {
+            readWriteHandler.createNode(levelPath, RocksDBMNodeType.ENTITY, 
DEFAULT_NODE_VALUE);
+          } else {
+            readWriteHandler.createNode(levelPath, RocksDBMNodeType.INTERNAL, 
DEFAULT_NODE_VALUE);
+          }
+        } else {
+          if (start == nodes.length) {
+            throw new PathAlreadyExistException(levelPath);
+          }
+
+          if (checkResult.getResult(RocksDBMNodeType.MEASUREMENT)
+              || checkResult.getResult(RocksDBMNodeType.ALISA)) {
+            throw new PathAlreadyExistException(levelPath);
+          }
+
+          if (start == nodes.length - 1) {
+            if (checkResult.getResult(RocksDBMNodeType.INTERNAL)) {
+              // convert the parent node to entity if it is internal node
+              readWriteHandler.convertToEntityNode(levelPath, 
DEFAULT_NODE_VALUE);
+            } else if (checkResult.getResult(RocksDBMNodeType.ENTITY)) {
+              if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+                throw new AlignedTimeseriesException(
+                    "Timeseries under this entity is aligned, please use 
createAlignedTimeseries or change entity.",
+                    levelPath);
+              }
+            } else {
+              throw new MetadataException(
+                  "parent of measurement could only be entity or internal 
node");
+            }
+          }
+        }
+      } catch (Exception e) {
+        while (!lockedLocks.isEmpty()) {
+          lockedLocks.pop().unlock();
+        }
+        throw e;
+      } finally {
+        if (!lockedLocks.isEmpty()) {
+          lockedLocks.pop().unlock();
+        }
+      }
+    } else {
+      while (!lockedLocks.isEmpty()) {
+        lockedLocks.pop().unlock();
+      }
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + 
levelPath);
+    }
+  }
+
+  private void createTimeSeriesNode(
+      String[] nodes,
+      String levelPath,
+      IMeasurementSchema schema,
+      String alias,
+      Map<String, String> tags,
+      Map<String, String> attributes)
+      throws IOException, RocksDBException, MetadataException, 
InterruptedException {
+    // create time-series node
+    WriteBatch batch = new WriteBatch();
+    byte[] value = RocksDBUtils.buildMeasurementNodeValue(schema, alias, tags, 
attributes);
+    byte[] measurementKey = RocksDBUtils.toMeasurementNodeKey(levelPath);
+    batch.put(measurementKey, value);
+
+    // measurement with tags will save in a separate table at the same time
+    if (tags != null && !tags.isEmpty()) {
+      batch.put(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), 
measurementKey, DEFAULT_NODE_VALUE);
+    }
+
+    if (StringUtils.isNotEmpty(alias)) {
+      String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+      aliasNodes[nodes.length - 1] = alias;
+      String aliasLevelPath = RocksDBUtils.getLevelPath(aliasNodes, 
aliasNodes.length - 1);
+      byte[] aliasNodeKey = RocksDBUtils.toAliasNodeKey(aliasLevelPath);
+      Lock lock = locksPool.computeIfAbsent(aliasLevelPath, x -> new 
ReentrantLock());
+      if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+        try {
+          if 
(!readWriteHandler.keyExistByAllTypes(aliasLevelPath).existAnyKey()) {
+            batch.put(aliasNodeKey, 
RocksDBUtils.buildAliasNodeValue(measurementKey));
+            readWriteHandler.executeBatch(batch);
+          } else {
+            throw new AliasAlreadyExistException(levelPath, alias);
+          }
+        } finally {
+          lock.unlock();
+        }
+      } else {
+        throw new AcquireLockTimeoutException("acquire lock timeout: " + 
levelPath);
+      }
+    } else {
+      readWriteHandler.executeBatch(batch);
+    }
+  }
+
+  /**
+   * @param prefixPath
+   * @param measurements
+   * @param dataTypes
+   * @param encodings
+   * @param compressors
+   * @throws MetadataException
+   */
+  @Override
+  public void createAlignedTimeSeries(
+      PartialPath prefixPath,
+      List<String> measurements,
+      List<TSDataType> dataTypes,
+      List<TSEncoding> encodings,
+      List<CompressionType> compressors)
+      throws MetadataException {
+    createAlignedTimeSeries(
+        new CreateAlignedTimeSeriesPlan(
+            prefixPath, measurements, dataTypes, encodings, compressors, 
null));
+  }
+
+  /**
+   * create aligned timeseries
+   *
+   * @param plan CreateAlignedTimeSeriesPlan
+   */
+  public void createAlignedTimeSeries(CreateAlignedTimeSeriesPlan plan) throws 
MetadataException {
+    PartialPath prefixPath = plan.getPrefixPath();
+    List<String> measurements = plan.getMeasurements();
+    List<TSDataType> dataTypes = plan.getDataTypes();
+    List<TSEncoding> encodings = plan.getEncodings();
+
+    if (prefixPath.getNodeLength() > MAX_PATH_DEPTH - 1) {
+      String.format(
+          "Prefix path is too long, provide: %d, max: %d",
+          prefixPath.getNodeLength(), MRocksDBManager.MAX_PATH_DEPTH - 1);
+    }
+
+    for (int i = 0; i < measurements.size(); i++) {
+      SchemaUtils.checkDataTypeWithEncoding(dataTypes.get(i), 
encodings.get(i));
+      MetaFormatUtils.checkNodeName(measurements.get(i));
+    }
+
+    int sgIndex = ensureStorageGroup(prefixPath);
+
+    try {
+      createEntityRecursively(
+          prefixPath.getNodes(), prefixPath.getNodeLength(), sgIndex + 1, 
true, new Stack<>());
+      WriteBatch batch = new WriteBatch();
+      String[] locks = new String[measurements.size()];
+      for (int i = 0; i < measurements.size(); i++) {
+        String measurement = measurements.get(i);
+        String levelPath = 
RocksDBUtils.getMeasurementLevelPath(prefixPath.getNodes(), measurement);
+        locks[i] = levelPath;
+        MeasurementSchema schema =
+            new MeasurementSchema(measurement, dataTypes.get(i), 
encodings.get(i));
+        byte[] key = RocksDBUtils.toMeasurementNodeKey(levelPath);
+        byte[] value = RocksDBUtils.buildMeasurementNodeValue(schema, null, 
null, null);
+        batch.put(key, value);
+      }
+
+      Stack<Lock> acquiredLock = new Stack<>();
+      try {
+        for (String lockKey : locks) {
+          Lock lock = locksPool.computeIfAbsent(lockKey, x -> new 
ReentrantLock());
+          if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+            acquiredLock.push(lock);
+            if (readWriteHandler.keyExistByAllTypes(lockKey).existAnyKey()) {
+              throw new PathAlreadyExistException(lockKey);
+            }
+          } else {
+            throw new AcquireLockTimeoutException("acquire lock timeout: " + 
lockKey);
+          }
+        }
+        readWriteHandler.executeBatch(batch);
+      } finally {
+        while (!acquiredLock.isEmpty()) {
+          Lock lock = acquiredLock.pop();
+          lock.unlock();
+        }
+      }
+      // TODO: update cache if necessary
+    } catch (InterruptedException | RocksDBException | IOException e) {
+      throw new MetadataException(e);
+    }
+  }
+
+  /**
+   * The method assume Storage Group Node has been created
+   *
+   * @param nodes
+   * @param start
+   * @param end
+   * @param aligned
+   */
+  private void createEntityRecursively(
+      String[] nodes, int start, int end, boolean aligned, Stack<Lock> 
lockedLocks)
+      throws RocksDBException, MetadataException, InterruptedException {
+    if (start <= end) {
+      // nodes before "end" must exist
+      return;
+    }
+    String levelPath = RocksDBUtils.getLevelPath(nodes, start - 1);
+    Lock lock = locksPool.computeIfAbsent(levelPath, x -> new ReentrantLock());
+    if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+      try {
+        CheckKeyResult checkResult = 
readWriteHandler.keyExistByAllTypes(levelPath);
+        if (!checkResult.existAnyKey()) {
+          createEntityRecursively(nodes, start - 1, end, aligned, lockedLocks);
+          if (start == nodes.length) {
+            byte[] nodeKey = RocksDBUtils.toEntityNodeKey(levelPath);
+            byte[] value = aligned ? DEFAULT_ALIGNED_ENTITY_VALUE : 
DEFAULT_NODE_VALUE;
+            readWriteHandler.createNode(nodeKey, value);
+          } else {
+            readWriteHandler.createNode(levelPath, RocksDBMNodeType.INTERNAL, 
DEFAULT_NODE_VALUE);
+          }
+        } else {
+          if (start == nodes.length) {
+
+            // make sure sg node and entity node are different
+            // eg.,'root.a' is a storage group path, 'root.a.b' can not be a 
timeseries
+            if (checkResult.getResult(RocksDBMNodeType.STORAGE_GROUP)) {
+              throw new MetadataException("Storage Group Node and Entity Node 
could not be same!");
+            }
+
+            if (!checkResult.getResult(RocksDBMNodeType.ENTITY)) {
+              throw new MetadataException("Node already exists but not 
entity");
+            }
+
+            if ((checkResult.getValue()[1] & FLAG_IS_ALIGNED) != 0) {
+              throw new MetadataException("Entity node exists but not 
aligned");
+            }
+          } else if (checkResult.getResult(RocksDBMNodeType.MEASUREMENT)
+              || checkResult.getResult(RocksDBMNodeType.ALISA)) {
+            throw new MetadataException("Path contains measurement node");
+          }
+        }
+      } catch (Exception e) {
+        while (!lockedLocks.isEmpty()) {
+          lockedLocks.pop().unlock();
+        }
+        throw e;
+      } finally {
+        if (!lockedLocks.isEmpty()) {
+          lockedLocks.pop().unlock();
+        }
+      }
+    } else {
+      while (!lockedLocks.isEmpty()) {
+        lockedLocks.pop().unlock();
+      }
+      throw new AcquireLockTimeoutException("acquire lock timeout: " + 
levelPath);
+    }
+  }
+
+  @Override
+  public String deleteTimeseries(PartialPath pathPattern, boolean 
isPrefixMatch)
+      throws MetadataException {
+    Set<String> failedNames = ConcurrentHashMap.newKeySet();
+    //    Set<String> parentToCheck = ConcurrentHashMap.newKeySet();
+    traverseOutcomeBasins(
+        pathPattern.getNodes(),
+        MAX_PATH_DEPTH,
+        (key, value) -> {
+          String path = null;
+          RMeasurementMNode deletedNode = null;
+          try {
+            path = RocksDBUtils.getPathByInnerName(new String(key));
+            String[] nodes = MetaUtils.splitPathToDetachedPath(path);
+            String levelPath = RocksDBUtils.getLevelPath(nodes, nodes.length - 
1);
+            // Delete measurement node
+            Lock lock = locksPool.computeIfAbsent(levelPath, x -> new 
ReentrantLock());
+            if (lock.tryLock(MAX_LOCK_WAIT_TIME, TimeUnit.MILLISECONDS)) {
+              try {
+                deletedNode = new RMeasurementMNode(path, value);
+                WriteBatch batch = new WriteBatch();
+                // delete the last node of path
+                batch.delete(key);
+                if (deletedNode.getAlias() != null) {
+                  String[] aliasNodes = Arrays.copyOf(nodes, nodes.length);
+                  aliasNodes[nodes.length - 1] = deletedNode.getAlias();
+                  String aliasLevelPath =
+                      RocksDBUtils.getLevelPath(aliasNodes, aliasNodes.length 
- 1);
+                  batch.delete(RocksDBUtils.toAliasNodeKey(aliasLevelPath));
+                }
+                if (deletedNode.getTags() != null && 
!deletedNode.getTags().isEmpty()) {
+                  batch.delete(readWriteHandler.getCFHByName(TABLE_NAME_TAGS), 
key);
+                  // TODO: tags invert index update
+                }
+                readWriteHandler.executeBatch(batch);
+              } finally {
+                lock.unlock();
+              }
+            } else {
+              throw new AcquireLockTimeoutException("acquire lock timeout, " + 
path);
+            }
+
+            //            if (nodes.length > 1) {
+            //              // Only try to delete directly parent if no other 
siblings
+            //              parentToCheck.add(
+            //                  String.join(PATH_SEPARATOR, 
Arrays.copyOf(nodes, nodes.length -
+            // 1)));
+            //            }
+          } catch (IllegalPathException e) {
+          } catch (Exception e) {
+            logger.error("delete timeseries [{}] fail", path, e);
+            failedNames.add(path);
+            return false;
+          }
+          return true;
+        },
+        new Character[] {NODE_TYPE_MEASUREMENT});
+
+    // TODO: do we need to delete parent??
+    //    parentToCheck
+    //        .parallelStream()
+    //        .forEach(

Review comment:
       It is not recommended to have a large amount of comment code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to