MarcosZyk commented on a change in pull request #5295:
URL: https://github.com/apache/iotdb/pull/5295#discussion_r840634715



##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/MetadataManagerType.java
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata;
+
+public enum MetadataManagerType {
+  MEMORY_MANAGER,
+  ROCKSDB_MANAGER;
+
+  public static MetadataManagerType of(String value) {
+    try {
+      return Enum.valueOf(MetadataManagerType.class, value);
+    } catch (Exception e) {
+      return MEMORY_MANAGER;
+    }
+  }

Review comment:
       Please rename to ```SchemaEngineType```.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RMeasurementMNode.java
##########
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.engine.trigger.executor.TriggerExecutor;
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.path.MeasurementPath;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import 
org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.db.metadata.template.Template;
+import org.apache.iotdb.tsfile.file.metadata.enums.TSDataType;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+import org.apache.iotdb.tsfile.write.schema.IMeasurementSchema;
+import org.apache.iotdb.tsfile.write.schema.MeasurementSchema;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class RMeasurementMNode extends RMNode implements IMeasurementMNode {
+
+  protected String alias;
+
+  private IMeasurementSchema schema;
+
+  private Map<String, String> tags;
+
+  private Map<String, String> attributes;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public RMeasurementMNode(String fullPath, RSchemaReadWriteHandler 
readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  public RMeasurementMNode(
+      String fullPath, byte[] value, RSchemaReadWriteHandler readWriteHandler) 
{
+    super(fullPath, readWriteHandler);
+    deserialize(value);
+  }
+
+  @Override
+  public IEntityMNode getParent() {
+    if (super.getParent() == null) {
+      return null;
+    }
+    return parent.getAsEntityMNode();
+  }
+
+  @Override
+  public MeasurementPath getMeasurementPath() {
+    MeasurementPath result = new MeasurementPath(super.getPartialPath(), 
schema);
+    result.setUnderAlignedEntity(getParent().isAligned());
+    if (alias != null && !alias.isEmpty()) {
+      result.setMeasurementAlias(alias);
+    }
+    return result;
+  }
+
+  @Override
+  public IMeasurementSchema getSchema() {
+    return schema;
+  }
+
+  @Override
+  public TSDataType getDataType(String measurementId) {
+    return schema.getType();
+  }
+
+  // unsupported exceptions
+  @Override
+  public long getOffset() {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void setOffset(long offset) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public String getAlias() {
+    return alias;
+  }
+
+  @Override
+  public void setAlias(String alias) {
+    this.alias = alias;
+  }
+
+  @Override
+  public TriggerExecutor getTriggerExecutor() {
+    return null;
+  }

Review comment:
       The Trigger feature seems unavailable when using rocksdb-based 
schemaEngine. Maybe we can have more discussion with @SteveYurongSu about how 
to implement trigger since the same problem also affects our schema_file_based 
schemaEngine.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/SchemaDataTransfer.java
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTreeAboveSG;
+import org.apache.iotdb.db.metadata.mtree.MTreeBelowSG;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Use the class to transfer data from mlog.bin and mtree-*.snapshot to 
rocksdb based manager */
+public class SchemaDataTransfer {
+

Review comment:
       I think this is a tool for transfer schema from mlog to rocksdb. Shall 
we implement this as a formal tool which support user pass mlog path.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import 
org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.db.metadata.template.Template;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RInternalMNode extends RMNode {
+
+  // schema template
+  protected Template schemaTemplate = null;
+
+  private volatile boolean useTemplate = false;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public RInternalMNode(String fullPath, RSchemaReadWriteHandler 
readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  /** check whether the MNode has a child with the name */
+  @Override
+  public boolean hasChild(String name) {
+    String childPathName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    IMNode node = getNodeBySpecifiedPath(childPathName);
+    return node != null;
+  }
+
+  /** get the child with the name */
+  @Override
+  public IMNode getChild(String name) {
+    String childPathName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    return getNodeBySpecifiedPath(childPathName);
+  }
+
+  /**
+   * add a child to current mnode
+   *
+   * @param name child's name
+   * @param child child's node
+   * @return
+   */
+  @Override
+  public IMNode addChild(String name, IMNode child) {
+    child.setParent(this);
+    String childName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    int childNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childName);
+    try {
+      if (child instanceof RStorageGroupMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, 
RMNodeType.STORAGE_GROUP.getValue());
+        long ttl = ((RStorageGroupMNode) child).getDataTTL();
+        readWriteHandler.updateNode(
+            innerName.getBytes(), 
RSchemaUtils.updateTTL(RSchemaConstants.DEFAULT_NODE_VALUE, ttl));
+      } else if (child instanceof REntityMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, RMNodeType.ENTITY.getValue());
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      } else if (child instanceof RInternalMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, RMNodeType.INTERNAL.getValue());
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      } else if (child instanceof RMeasurementMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, 
RMNodeType.MEASUREMENT.getValue());
+        // todo all existing attributes of the measurementNode need to be 
written
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      }
+    } catch (RocksDBException e) {
+      logger.error(e.getMessage());
+    }
+    return child;
+  }
+
+  /**
+   * Add a child to the current mnode.
+   *
+   * <p>This method will not take the child's name as one of the inputs and 
will also make this
+   * Mnode be child node's parent. All is to reduce the probability of 
mistaken by users and be more
+   * convenient for users to use. And the return of this method is used to 
conveniently construct a
+   * chain of time series for users.
+   *
+   * @param child child's node
+   * @return return the MNode already added
+   */
+  @Override
+  public IMNode addChild(IMNode child) {
+    addChild(child.getName(), child);
+    return child;
+  }
+
+  /** delete a child */
+  @Override
+  public void deleteChild(String name) {
+    String childPathName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    int nodeNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childPathName);
+    for (RMNodeType type : RMNodeType.values()) {
+      byte[] childInnerName =
+          RSchemaUtils.convertPartialPathToInner(childPathName, 
nodeNameMaxLevel, type.getValue())
+              .getBytes();
+      try {
+        if (readWriteHandler.keyExist(childInnerName)) {
+          readWriteHandler.deleteByKey(childInnerName);
+          return;
+        }
+      } catch (RocksDBException e) {
+        logger.error(e.getMessage());
+      }
+    }
+  }
+
+  /**
+   * replace a child of this mnode
+   *
+   * @param oldChildName measurement name
+   * @param newChildNode new child node
+   */
+  @Override
+  public void replaceChild(String oldChildName, IMNode newChildNode) {
+    IMNode oldChildNode = this.getChild(oldChildName);
+    if (oldChildNode == null) {
+      return;
+    }
+
+    // newChildNode builds parent-child relationship
+    Map<String, IMNode> grandChildren = oldChildNode.getChildren();
+    if (!grandChildren.isEmpty()) {
+      newChildNode.setChildren(grandChildren);
+      grandChildren.forEach(
+          (grandChildName, grandChildNode) -> 
grandChildNode.setParent(newChildNode));
+    }
+
+    if (newChildNode.isEntity() && oldChildNode.isEntity()) {
+      Map<String, IMeasurementMNode> grandAliasChildren =
+          oldChildNode.getAsEntityMNode().getAliasChildren();
+      if (!grandAliasChildren.isEmpty()) {
+        newChildNode.getAsEntityMNode().setAliasChildren(grandAliasChildren);
+        grandAliasChildren.forEach(
+            (grandAliasChildName, grandAliasChild) -> 
grandAliasChild.setParent(newChildNode));
+      }
+      
newChildNode.getAsEntityMNode().setUseTemplate(oldChildNode.isUseTemplate());
+    }
+
+    newChildNode.setSchemaTemplate(oldChildNode.getSchemaTemplate());
+
+    newChildNode.setParent(this);
+
+    this.deleteChild(oldChildName);
+    this.addChild(newChildNode.getName(), newChildNode);
+  }

Review comment:
       This method has rewritten in current master branch. Please refer to the 
related code and update this.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/RInternalMNode.java
##########
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import 
org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaUtils;
+import org.apache.iotdb.db.metadata.template.Template;
+
+import org.rocksdb.RocksDBException;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class RInternalMNode extends RMNode {
+
+  // schema template
+  protected Template schemaTemplate = null;
+
+  private volatile boolean useTemplate = false;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public RInternalMNode(String fullPath, RSchemaReadWriteHandler 
readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  /** check whether the MNode has a child with the name */
+  @Override
+  public boolean hasChild(String name) {
+    String childPathName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    IMNode node = getNodeBySpecifiedPath(childPathName);
+    return node != null;
+  }
+
+  /** get the child with the name */
+  @Override
+  public IMNode getChild(String name) {
+    String childPathName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    return getNodeBySpecifiedPath(childPathName);
+  }
+
+  /**
+   * add a child to current mnode
+   *
+   * @param name child's name
+   * @param child child's node
+   * @return
+   */
+  @Override
+  public IMNode addChild(String name, IMNode child) {
+    child.setParent(this);
+    String childName = 
fullPath.concat(RSchemaConstants.PATH_SEPARATOR).concat(name);
+    int childNameMaxLevel = RSchemaUtils.getLevelByPartialPath(childName);
+    try {
+      if (child instanceof RStorageGroupMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, 
RMNodeType.STORAGE_GROUP.getValue());
+        long ttl = ((RStorageGroupMNode) child).getDataTTL();
+        readWriteHandler.updateNode(
+            innerName.getBytes(), 
RSchemaUtils.updateTTL(RSchemaConstants.DEFAULT_NODE_VALUE, ttl));
+      } else if (child instanceof REntityMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, RMNodeType.ENTITY.getValue());
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      } else if (child instanceof RInternalMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, RMNodeType.INTERNAL.getValue());
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      } else if (child instanceof RMeasurementMNode) {
+        String innerName =
+            RSchemaUtils.convertPartialPathToInner(
+                childName, childNameMaxLevel, 
RMNodeType.MEASUREMENT.getValue());
+        // todo all existing attributes of the measurementNode need to be 
written
+        readWriteHandler.updateNode(innerName.getBytes(), new byte[] {});
+      }

Review comment:
       Maybe split this method to each RMNode class with a interface and use 
override is better. It is easy to forget some attributes when maintaining the 
huge if-else code block.

##########
File path: server/src/assembly/resources/conf/iotdb-engine.properties
##########
@@ -505,6 +505,14 @@ timestamp_precision=ms
 # Datatype: int
 # query_timeout_threshold=60000
 
+####################
+### Metadata Configuration
+####################
+
+# Which metadata manager to be used, right now MEMORY_MANAGER and 
ROCKSDB_MANAGER are supported. Default MEMORY_MANAGER.
+# Datatype: string
+# meta_data_manager=MEMORY_MANAGER

Review comment:
       Please rename to schema_engine_type, and the enum value could be 
```memory_based```, ```rocksdb_based``` and ```schema_file_based```. This 
parameter can be shared with our persistent schema feature.

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/mnode/REntityMNode.java
##########
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb.mnode;
+
+import org.apache.iotdb.db.metadata.lastCache.container.ILastCacheContainer;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IEntityMNode;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaConstants;
+import 
org.apache.iotdb.db.metadata.schemaregion.rocksdb.RSchemaReadWriteHandler;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class REntityMNode extends RInternalMNode implements IEntityMNode {
+
+  private volatile boolean isAligned = false;
+
+  /**
+   * Constructor of MNode.
+   *
+   * @param fullPath
+   */
+  public REntityMNode(String fullPath, RSchemaReadWriteHandler 
readWriteHandler) {
+    super(fullPath, readWriteHandler);
+  }
+
+  public REntityMNode(String fullPath, byte[] value, RSchemaReadWriteHandler 
readWriteHandler) {
+    super(fullPath, readWriteHandler);
+    deserialize(value);
+  }
+
+  @Override
+  public boolean addAlias(String alias, IMeasurementMNode child) {
+
+    // todo add alias
+    return true;

Review comment:
       Hope for support on this feature. 

##########
File path: 
server/src/main/java/org/apache/iotdb/db/metadata/schemaregion/rocksdb/SchemaDataTransfer.java
##########
@@ -0,0 +1,446 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.metadata.schemaregion.rocksdb;
+
+import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.engine.fileSystem.SystemFileFactory;
+import org.apache.iotdb.db.exception.metadata.AcquireLockTimeoutException;
+import org.apache.iotdb.db.exception.metadata.MetadataException;
+import org.apache.iotdb.db.metadata.MetadataConstant;
+import org.apache.iotdb.db.metadata.logfile.MLogReader;
+import org.apache.iotdb.db.metadata.logfile.MLogWriter;
+import org.apache.iotdb.db.metadata.mnode.IMeasurementMNode;
+import org.apache.iotdb.db.metadata.mnode.IStorageGroupMNode;
+import org.apache.iotdb.db.metadata.mtree.MTreeAboveSG;
+import org.apache.iotdb.db.metadata.mtree.MTreeBelowSG;
+import org.apache.iotdb.db.metadata.path.PartialPath;
+import org.apache.iotdb.db.qp.physical.PhysicalPlan;
+import org.apache.iotdb.db.qp.physical.sys.AutoCreateDeviceMNodePlan;
+import org.apache.iotdb.db.qp.physical.sys.ChangeAliasPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateAlignedTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.CreateTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.DeleteTimeSeriesPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetStorageGroupPlan;
+import org.apache.iotdb.db.qp.physical.sys.SetTTLPlan;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ForkJoinPool;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/** Use the class to transfer data from mlog.bin and mtree-*.snapshot to 
rocksdb based manager */
+public class SchemaDataTransfer {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(SchemaDataTransfer.class);
+
+  private static final int DEFAULT_TRANSFER_THREAD_POOL_SIZE = 200;
+  private static final int DEFAULT_TRANSFER_PLANS_BUFFER_SIZE = 100_000;
+
+  private final ForkJoinPool forkJoinPool = new 
ForkJoinPool(DEFAULT_TRANSFER_THREAD_POOL_SIZE);
+
+  private String mtreeSnapshotPath;
+  private RSchemaRegion rocksDBManager;
+  private MLogWriter mLogWriter;
+  private final String FAILED_MLOG_PATH =
+      IoTDBDescriptor.getInstance().getConfig().getSchemaDir()
+          + File.separator
+          + MetadataConstant.METADATA_LOG
+          + ".transfer_failed";
+
+  private final String IDX_FILE_PATH =
+      RSchemaReadWriteHandler.ROCKSDB_PATH + File.separator + 
"transfer_mlog.idx";
+
+  private final AtomicInteger failedPlanCount = new AtomicInteger(0);
+  private final List<PhysicalPlan> retryPlans = new ArrayList<>();
+
+  SchemaDataTransfer() throws MetadataException {
+    rocksDBManager = new RSchemaRegion();
+  }
+
+  public static void main(String[] args) {
+    try {
+      SchemaDataTransfer transfer = new SchemaDataTransfer();
+      transfer.doTransfer();
+    } catch (MetadataException | IOException | ExecutionException | 
InterruptedException e) {
+      logger.error(e.getMessage());
+    }
+  }
+
+  public void doTransfer() throws IOException, ExecutionException, 
InterruptedException {
+    File failedFile = new File(FAILED_MLOG_PATH);
+    if (failedFile.exists()) {
+      logger.info("Failed file [" + FAILED_MLOG_PATH + "] delete:" + 
failedFile.delete());
+    }
+
+    mLogWriter = new MLogWriter(FAILED_MLOG_PATH);
+    mLogWriter.setLogNum(0);
+
+    String schemaDir = 
IoTDBDescriptor.getInstance().getConfig().getSchemaDir();
+    File schemaFolder = SystemFileFactory.INSTANCE.getFile(schemaDir);
+    if (!schemaFolder.exists()) {
+      if (schemaFolder.mkdirs()) {
+        logger.info("create system folder {}", schemaFolder.getAbsolutePath());
+      } else {
+        logger.info("create system folder {} failed.", 
schemaFolder.getAbsolutePath());
+      }
+    }
+
+    mtreeSnapshotPath = schemaDir + File.separator + 
MetadataConstant.MTREE_SNAPSHOT;
+    File mtreeSnapshot = SystemFileFactory.INSTANCE.getFile(mtreeSnapshotPath);
+    if (mtreeSnapshot.exists()) {
+      try {
+        doTransferFromSnapshot();
+      } catch (MetadataException e) {
+        logger.error("Fatal error, terminate data transfer!!!", e);
+      }
+    }
+
+    String logFilePath = schemaDir + File.separator + 
MetadataConstant.METADATA_LOG;
+    File logFile = SystemFileFactory.INSTANCE.getFile(logFilePath);
+    // init the metadata from the operation log
+    if (logFile.exists()) {
+      try (MLogReader mLogReader = new MLogReader(schemaDir, 
MetadataConstant.METADATA_LOG)) {
+        int startIdx = 0;
+        File idxFile = new File(IDX_FILE_PATH);
+        if (idxFile.exists()) {
+          try (BufferedReader br = new BufferedReader(new 
FileReader(idxFile))) {
+            String idxStr = br.readLine();
+            if (StringUtils.isNotEmpty(idxStr)) {
+              startIdx = Integer.valueOf(idxStr);
+            }
+          }
+        }
+        transferFromMLog(mLogReader, startIdx);
+      } catch (Exception e) {
+        throw new IOException("Failed to parser mlog.bin for err:" + e);
+      }
+    } else {
+      logger.info("No mlog.bin file find, skip data transfer");
+    }
+    mLogWriter.close();
+
+    logger.info("Transfer metadata from MManager to MRocksDBManager 
complete!");
+  }
+
+  private void transferFromMLog(MLogReader mLogReader, long startIdx)
+      throws IOException, MetadataException, ExecutionException, 
InterruptedException {
+    long time = System.currentTimeMillis();
+    logger.info("start from {} to transfer data from mlog.bin", startIdx);
+    int currentIdx = 0;
+    PhysicalPlan plan;
+    List<PhysicalPlan> nonCollisionCollections = new ArrayList<>();
+    while (mLogReader.hasNext()) {
+      try {
+        plan = mLogReader.next();
+        currentIdx++;
+        if (currentIdx <= startIdx) {
+          continue;
+        }
+      } catch (Exception e) {
+        logger.error("Parse mlog error at lineNumber {} because:", currentIdx, 
e);
+        throw e;
+      }
+      if (plan == null) {
+        continue;
+      }
+
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+        case CREATE_ALIGNED_TIMESERIES:
+        case AUTO_CREATE_DEVICE_MNODE:
+          nonCollisionCollections.add(plan);
+          if (nonCollisionCollections.size() > 
DEFAULT_TRANSFER_PLANS_BUFFER_SIZE) {
+            executeBufferedOperation(nonCollisionCollections);
+          }
+          break;
+        case SET_STORAGE_GROUP:
+        case TTL:
+        case CHANGE_ALIAS:
+        case DELETE_TIMESERIES:
+          executeBufferedOperation(nonCollisionCollections);
+          try {
+            rocksDBManager.operation(plan);
+          } catch (IOException e) {
+            logger.warn("operate [" + plan.getOperatorType().toString() + "] 
failed, try again.");
+            rocksDBManager.operation(plan);
+          } catch (MetadataException e) {
+            logger.error("Can not operate cmd {} for err:", 
plan.getOperatorType(), e);
+          }
+          break;
+        case DELETE_STORAGE_GROUP:
+          DeleteStorageGroupPlan deleteStorageGroupPlan = 
(DeleteStorageGroupPlan) plan;
+          for (PartialPath path : deleteStorageGroupPlan.getPaths()) {
+            logger.info("delete storage group: {}", path.getFullPath());
+          }
+          break;
+        case CHANGE_TAG_OFFSET:
+        case CREATE_TEMPLATE:
+        case DROP_TEMPLATE:
+        case APPEND_TEMPLATE:
+        case PRUNE_TEMPLATE:
+        case SET_TEMPLATE:
+        case ACTIVATE_TEMPLATE:
+        case UNSET_TEMPLATE:
+        case CREATE_CONTINUOUS_QUERY:
+        case DROP_CONTINUOUS_QUERY:
+          logger.error("unsupported operations {}", plan);
+          break;
+        default:
+          logger.error("Unrecognizable command {}", plan.getOperatorType());
+      }
+    }
+
+    executeBufferedOperation(nonCollisionCollections);
+
+    if (retryPlans.size() > 0) {
+      for (PhysicalPlan retryPlan : retryPlans) {
+        try {
+          rocksDBManager.operation(retryPlan);
+        } catch (MetadataException e) {
+          logger.error("Execute plan failed: {}", retryPlan, e);
+        } catch (Exception e) {
+          persistFailedLog(retryPlan);
+        }
+      }
+    }
+
+    File idxFile = new File(IDX_FILE_PATH);
+    try (FileWriter writer = new FileWriter(idxFile)) {
+      writer.write(String.valueOf(currentIdx));
+    }
+    logger.info(
+        "Transfer data from mlog.bin complete after {}ms with {} errors",
+        System.currentTimeMillis() - time,
+        failedPlanCount.get());
+  }
+
+  private void executeBufferedOperation(List<PhysicalPlan> plans)
+      throws ExecutionException, InterruptedException {
+    if (plans.size() <= 0) {
+      return;
+    }
+    forkJoinPool
+        .submit(
+            () ->
+                plans
+                    .parallelStream()
+                    .forEach(
+                        x -> {
+                          try {
+                            rocksDBManager.operation(x);
+                          } catch (MetadataException e) {
+                            if (e instanceof AcquireLockTimeoutException) {
+                              retryPlans.add(x);
+                            } else {
+                              logger.error("Execute plan failed: {}", x, e);
+                            }
+                          } catch (Exception e) {
+                            retryPlans.add(x);
+                          }
+                        }))
+        .get();
+    logger.debug("parallel executed {} operations", plans.size());
+    plans.clear();
+  }
+
+  private void persistFailedLog(PhysicalPlan plan) {
+    logger.info("persist failed plan: {}", plan.toString());
+    failedPlanCount.incrementAndGet();
+    try {
+      switch (plan.getOperatorType()) {
+        case CREATE_TIMESERIES:
+          mLogWriter.createTimeseries((CreateTimeSeriesPlan) plan);
+          break;
+        case CREATE_ALIGNED_TIMESERIES:
+          mLogWriter.createAlignedTimeseries((CreateAlignedTimeSeriesPlan) 
plan);
+          break;
+        case AUTO_CREATE_DEVICE_MNODE:
+          mLogWriter.autoCreateDeviceMNode((AutoCreateDeviceMNodePlan) plan);
+          break;
+        case DELETE_TIMESERIES:
+          mLogWriter.deleteTimeseries((DeleteTimeSeriesPlan) plan);
+          break;
+        case SET_STORAGE_GROUP:
+          SetStorageGroupPlan setStorageGroupPlan = (SetStorageGroupPlan) plan;
+          mLogWriter.setStorageGroup(setStorageGroupPlan.getPath());
+          break;
+        case DELETE_STORAGE_GROUP:
+          DeleteStorageGroupPlan deletePlan = (DeleteStorageGroupPlan) plan;
+          for (PartialPath path : deletePlan.getPaths()) {
+            mLogWriter.deleteStorageGroup(path);
+          }
+          break;
+        case TTL:
+          SetTTLPlan ttlPlan = (SetTTLPlan) plan;
+          mLogWriter.setTTL(ttlPlan.getStorageGroup(), ttlPlan.getDataTTL());
+          break;
+        case CHANGE_ALIAS:
+          ChangeAliasPlan changeAliasPlan = (ChangeAliasPlan) plan;
+          mLogWriter.changeAlias(changeAliasPlan.getPath(), 
changeAliasPlan.getAlias());
+          break;
+        case CHANGE_TAG_OFFSET:
+        case CREATE_TEMPLATE:
+        case DROP_TEMPLATE:
+        case APPEND_TEMPLATE:
+        case PRUNE_TEMPLATE:
+        case SET_TEMPLATE:
+        case ACTIVATE_TEMPLATE:
+        case UNSET_TEMPLATE:
+        case CREATE_CONTINUOUS_QUERY:
+        case DROP_CONTINUOUS_QUERY:
+          throw new 
UnsupportedOperationException(plan.getOperatorType().toString());
+        default:
+          logger.error("Unrecognizable command {}", plan.getOperatorType());
+      }
+    } catch (IOException e) {
+      logger.error(
+          "Fatal error, exception when persist failed plan, metadata transfer 
should be failed", e);
+      throw new RuntimeException("Terminate transfer as persist log fail.");
+    }
+  }
+
+  @SuppressWarnings("squid:S3776") // Suppress high Cognitive Complexity 
warning
+  private void doTransferFromSnapshot() throws MetadataException {
+    logger.info("Start transfer data from snapshot");
+    long start = System.currentTimeMillis();
+
+    MTreeAboveSG mTree = new MTreeAboveSG();
+    //    mTree.init();
+    //    List<IStorageGroupMNode> storageGroupNodes = 
mTree.getAllStorageGroupNodes();
+    //
+    AtomicInteger errorCount = new AtomicInteger(0);
+    //    storageGroupNodes
+    //        .parallelStream()
+    //        .forEach(
+    //            sgNode -> {
+    //              try {
+    //                rocksDBManager.setStorageGroup(sgNode.getPartialPath());
+    //                if (sgNode.getDataTTL() > 0) {
+    //                  rocksDBManager.setTTL(sgNode.getPartialPath(), 
sgNode.getDataTTL());
+    //                }
+    //              } catch (MetadataException | IOException e) {
+    //                if (!(e instanceof StorageGroupAlreadySetException)
+    //                    && !(e instanceof PathAlreadyExistException)
+    //                    && !(e instanceof AliasAlreadyExistException)) {
+    //                  errorCount.incrementAndGet();
+    //                }
+    //                logger.error(
+    //                    "create storage group {} failed", 
sgNode.getPartialPath().getFullPath(),
+    // e);
+    //              }

Review comment:
       Remove the commented code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to